This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 65674c18a34db32c2824ea78260057ba2aef0240 Author: fuweng11 <[email protected]> AuthorDate: Wed Aug 17 14:25:07 2022 +0800 [INLONG-5380][Manager] Modify the saving function of the data node (#5381) * Modify the saving function of the data node * Add params for Hive node * Fix the unit tests error Co-authored-by: healchow <[email protected]> --- .../client/api/inner/ClientFactoryTest.java | 17 ++-- .../inlong/manager/common/enums/ErrorCodeEnum.java | 3 + .../inlong/manager/common/util/HttpUtils.java | 26 ++++-- .../{DataNodeRequest.java => DataNodeInfo.java} | 44 +++++----- .../manager/pojo/node/DataNodePageRequest.java | 4 +- .../inlong/manager/pojo/node/DataNodeRequest.java | 16 ++-- .../manager/pojo/node/hive/HiveDataNodeDTO.java | 94 ++++++++++++++++++++++ .../manager/pojo/node/hive/HiveDataNodeInfo.java | 66 +++++++++++++++ .../pojo/node/hive/HiveDataNodeRequest.java | 61 ++++++++++++++ .../service/node/AbstractDataNodeOperator.java | 78 ++++++++++++++++++ .../manager/service/node/DataNodeOperator.java | 65 +++++++++++++++ .../service/node/DataNodeOperatorFactory.java | 46 +++++++++++ .../service/{core => node}/DataNodeService.java | 8 +- .../{core/impl => node}/DataNodeServiceImpl.java | 50 ++++++------ .../service/node/hive/HiveDataNodeOperator.java | 87 ++++++++++++++++++++ .../manager/service/sink/AbstractSinkOperator.java | 21 ++++- .../manager/service/sink/StreamSinkOperator.java | 7 ++ .../service/sink/StreamSinkServiceImpl.java | 33 +++----- .../service/core/impl/DataNodeServiceTest.java | 29 ++++--- .../manager/web/controller/DataNodeController.java | 10 +-- .../web/controller/DataNodeControllerTest.java | 36 +++++---- 21 files changed, 667 insertions(+), 134 deletions(-) diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java index 203fd10ae..26910e33b 100644 --- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java +++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java @@ -62,8 +62,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest; import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest; -import org.apache.inlong.manager.pojo.node.DataNodeRequest; import org.apache.inlong.manager.pojo.node.DataNodeResponse; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink; import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink; @@ -965,9 +965,8 @@ class ClientFactoryTest { Response.success(1)) )) ); - DataNodeRequest request = new DataNodeRequest(); - request.setName("test_node"); - request.setType(DataNodeType.HIVE); + HiveDataNodeRequest request = new HiveDataNodeRequest(); + request.setName("test_hive_node"); Integer nodeId = dataNodeClient.save(request); Assertions.assertEquals(1, nodeId); } @@ -1007,9 +1006,8 @@ class ClientFactoryTest { ) ); - DataNodeRequest request = new DataNodeRequest(); - request.setName("test_node"); - request.setToken(DataNodeType.HIVE); + HiveDataNodeRequest request = new HiveDataNodeRequest(); + request.setName("test_hive_node"); PageInfo<DataNodeResponse> nodePageInfo = dataNodeClient.list(request); Assertions.assertEquals(JsonUtils.toJsonString(nodePageInfo.getList()), JsonUtils.toJsonString(nodeResponses)); } @@ -1025,10 +1023,9 @@ class ClientFactoryTest { ) ); - DataNodeRequest request = new DataNodeRequest(); + HiveDataNodeRequest request = new HiveDataNodeRequest(); request.setId(1); - request.setName("test_node"); - request.setType(DataNodeType.HIVE); + request.setName("test_hive_node"); Boolean isUpdate = dataNodeClient.update(request); Assertions.assertTrue(isUpdate); } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java index ce24b12ee..d6cc20776 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java @@ -60,6 +60,9 @@ public enum ErrorCodeEnum { CLUSTER_TYPE_NOT_SUPPORTED(1102, "Cluster type '%s' not supported"), CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"), + DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"), + DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"), + STREAM_NOT_FOUND(1201, "Inlong stream does not exist/no operation permission"), STREAM_ID_DUPLICATE(1202, "The current inlong group has a inlong stream with the same ID"), STREAM_OPT_NOT_ALLOWED(1203, diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java index 13b6bbad8..09c72464a 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java @@ -98,7 +98,7 @@ public class HttpUtils { /** * Send an HTTP request */ - public <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody, + public static <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody, HttpHeaders header, ParameterizedTypeReference<T> typeReference) { if (log.isDebugEnabled()) { log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody)); @@ -112,17 +112,31 @@ public class HttpUtils { return response.getBody(); } - public <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header, + /** + * Send GET request to the specified URL. + */ + public static <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params, + HttpHeaders header, ParameterizedTypeReference<T> typeReference) { + return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference); + } + + /** + * Send PUT request to the specified URL. + */ + public static <T> T putRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header, ParameterizedTypeReference<T> typeReference) { - return request(restTemplate, url, HttpMethod.POST, params, header, typeReference); + return request(restTemplate, url, HttpMethod.PUT, params, header, typeReference); } - public <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params, HttpHeaders header, + /** + * Send POST request to the specified URL. + */ + public static <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header, ParameterizedTypeReference<T> typeReference) { - return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference); + return request(restTemplate, url, HttpMethod.POST, params, header, typeReference); } - private String buildUrlWithQueryParam(String url, Map<String, Object> params) { + private static String buildUrlWithQueryParam(String url, Map<String, Object> params) { if (params == null) { return url; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java similarity index 60% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java index 3db00c792..d17569e14 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java @@ -17,46 +17,42 @@ package org.apache.inlong.manager.pojo.node; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import org.apache.inlong.manager.common.validation.UpdateValidation; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; +import java.util.Date; /** - * Data node request + * Data node info */ @Data -@Builder @NoArgsConstructor @AllArgsConstructor -@ApiModel("Data node request") -public class DataNodeRequest { +@ApiModel("Data node info") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type") +public abstract class DataNodeInfo { - @NotNull(groups = UpdateValidation.class) @ApiModelProperty(value = "Primary key") private Integer id; - @NotBlank(message = "node name cannot be blank") - @ApiModelProperty(value = "Node name") + @ApiModelProperty(value = "Data node name") private String name; - @NotBlank(message = "node type cannot be blank") - @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.") + @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.") private String type; - @ApiModelProperty(value = "Node url") + @ApiModelProperty(value = "Data node URL") private String url; - @ApiModelProperty(value = "Node username") + @ApiModelProperty("Data node username") private String username; - @ApiModelProperty(value = "Node token if needed") + @ApiModelProperty(value = "Data node token if needed") private String token; @ApiModelProperty(value = "Extended params") @@ -65,11 +61,23 @@ public class DataNodeRequest { @ApiModelProperty(value = "Description of the data node") private String description; - @NotBlank(message = "inCharges cannot be blank") - @ApiModelProperty(value = "Name of responsible person, separated by commas", required = true) + @ApiModelProperty(value = "Name of in charges, separated by commas") private String inCharges; + @ApiModelProperty(value = "Name of in creator") + private String creator; + + @ApiModelProperty(value = "Name of in modifier") + private String modifier; + @ApiModelProperty(value = "Version number") private Integer version; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private Date modifyTime; + + public abstract DataNodeRequest genRequest(); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java index 6f3862776..496aa9c9c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java @@ -31,10 +31,10 @@ import org.apache.inlong.manager.pojo.common.PageRequest; @ApiModel("Data node paging query request") public class DataNodePageRequest extends PageRequest { - @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.") + @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.") private String type; - @ApiModelProperty(value = "Node name") + @ApiModelProperty(value = "Data node name") private String name; @ApiModelProperty(value = "Keywords, name, url, etc.") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java index 3db00c792..54504e2dc 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java @@ -17,10 +17,10 @@ package org.apache.inlong.manager.pojo.node; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.AllArgsConstructor; -import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.inlong.manager.common.validation.UpdateValidation; @@ -32,31 +32,31 @@ import javax.validation.constraints.NotNull; * Data node request */ @Data -@Builder @NoArgsConstructor @AllArgsConstructor @ApiModel("Data node request") -public class DataNodeRequest { +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type") +public abstract class DataNodeRequest { @NotNull(groups = UpdateValidation.class) @ApiModelProperty(value = "Primary key") private Integer id; @NotBlank(message = "node name cannot be blank") - @ApiModelProperty(value = "Node name") + @ApiModelProperty(value = "Data node name") private String name; @NotBlank(message = "node type cannot be blank") - @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.") + @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.") private String type; - @ApiModelProperty(value = "Node url") + @ApiModelProperty(value = "Data node URL") private String url; - @ApiModelProperty(value = "Node username") + @ApiModelProperty(value = "Data node username") private String username; - @ApiModelProperty(value = "Node token if needed") + @ApiModelProperty(value = "Data node token if needed") private String token; @ApiModelProperty(value = "Extended params") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java new file mode 100644 index 000000000..5922e61e4 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.hive; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.NotNull; + +/** + * Hive data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Hive data node info") +public class HiveDataNodeDTO { + + private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeDTO.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe + + @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}") + private String jdbcUrl; + + @ApiModelProperty("Version for Hive, such as: 3.2.1") + private String hiveVersion; + + @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml") + private String hiveConfDir; + + @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000") + private String hdfsPath; + + @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/") + private String warehouse; + + @ApiModelProperty("User and group information for writing data to HDFS") + private String hdfsUgi; + + /** + * Get the dto instance from the request + */ + public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) throws Exception { + return HiveDataNodeDTO.builder() + .jdbcUrl(request.getJdbcUrl()) + .hiveVersion(request.getHiveVersion()) + .hiveConfDir(request.getHiveConfDir()) + .hdfsPath(request.getHdfsPath()) + .warehouse(request.getWarehouse()) + .hdfsUgi(request.getHdfsUgi()) + .build(); + } + + /** + * Get the dto instance from the JSON string. + */ + public static HiveDataNodeDTO getFromJson(@NotNull String extParams) { + try { + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class); + } catch (Exception e) { + LOGGER.error("Failed to extract additional parameters for hive data node: ", e); + throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java new file mode 100644 index 000000000..34b50ed06 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.hive; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; + +/** + * Hive data node info + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.HIVE) +@ApiModel("Hive data node info") +public class HiveDataNodeInfo extends DataNodeInfo { + + @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}") + private String jdbcUrl; + + @ApiModelProperty("Version for Hive, such as: 3.2.1") + private String hiveVersion; + + @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml") + private String hiveConfDir; + + @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000") + private String hdfsPath; + + @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/") + private String warehouse; + + @ApiModelProperty("User and group information for writing data to HDFS") + private String hdfsUgi; + + public HiveDataNodeInfo() { + this.setType(DataNodeType.HIVE); + } + + @Override + public HiveDataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, HiveDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java new file mode 100644 index 000000000..ab51ed666 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.hive; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +/** + * Hive data node request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.HIVE) +@ApiModel("Hive data node request") +public class HiveDataNodeRequest extends DataNodeRequest { + + @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}") + private String jdbcUrl; + + @ApiModelProperty("Version for Hive, such as: 3.2.1") + private String hiveVersion; + + @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml") + private String hiveConfDir; + + @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000") + private String hdfsPath; + + @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/") + private String warehouse; + + @ApiModelProperty("User and group information for writing data to HDFS") + private String hdfsUgi; + + public HiveDataNodeRequest() { + this.setType(DataNodeType.HIVE); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java new file mode 100644 index 000000000..d66ff1233 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Transactional; + +/** + * Default operation of data node. + */ +public abstract class AbstractDataNodeOperator implements DataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataNodeOperator.class); + + @Autowired + protected DataNodeEntityMapper dataNodeEntityMapper; + + @Override + @Transactional(rollbackFor = Throwable.class) + public Integer saveOpt(DataNodeRequest request, String operator) { + DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new); + // set the ext params + this.setTargetEntity(request, entity); + entity.setCreator(operator); + entity.setModifier(operator); + dataNodeEntityMapper.insert(entity); + + return entity.getId(); + } + + /** + * Set the parameters of the target entity. + * + * @param request data node request + * @param targetEntity entity which will set the new parameters + */ + protected abstract void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity); + + @Override + @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) + public void updateOpt(DataNodeRequest request, String operator) { + DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new); + // set the ext params + this.setTargetEntity(request, entity); + entity.setModifier(operator); + int rowCount = dataNodeEntityMapper.updateByIdSelective(entity); + if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { + LOGGER.error("data node has already updated with name={}, type={}, curVersion={}", request.getName(), + request.getType(), request.getVersion()); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java new file mode 100644 index 000000000..ce579994e --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node; + +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +/** + * Interface of the data node operator. + */ +public interface DataNodeOperator { + + /** + * Determines whether the current instance matches the specified type. + */ + Boolean accept(String dataNodeType); + + /** + * Get the data node type. + * + * @return data node type string + */ + String getDataNodeType(); + + /** + * Save the data node info. + * + * @param request request of the data node + * @param operator name of the operator + * @return data node id after saving + */ + Integer saveOpt(DataNodeRequest request, String operator); + + /** + * Get the data node info from the given entity. + * + * @param entity get field value from the entity + * @return cluster info after encapsulating + */ + DataNodeInfo getFromEntity(DataNodeEntity entity); + + /** + * Update the data node info. + * + * @param request request of update + * @param operator name of operator + */ + void updateOpt(DataNodeRequest request, String operator); +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java new file mode 100644 index 000000000..fba9d84d7 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * Factory for {@link DataNodeOperator}. + */ +@Service +public class DataNodeOperatorFactory { + + @Autowired + private List<DataNodeOperator> dataNodeOperatorList; + + /** + * Get a cluster operator instance via the given type + */ + public DataNodeOperator getInstance(String type) { + return dataNodeOperatorList.stream() + .filter(inst -> inst.accept(type)) + .findFirst() + .orElseThrow(() -> new BusinessException( + String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type))); + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java similarity index 91% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java index 027fd1bf1..29422e81d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.manager.service.core; +package org.apache.inlong.manager.service.node; import com.github.pagehelper.PageInfo; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodePageRequest; import org.apache.inlong.manager.pojo.node.DataNodeRequest; -import org.apache.inlong.manager.pojo.node.DataNodeResponse; /** * Data node service layer interface @@ -42,7 +42,7 @@ public interface DataNodeService { * @param id node id * @return node info */ - DataNodeResponse get(Integer id); + DataNodeInfo get(Integer id); /** * Paging query nodes according to conditions. @@ -50,7 +50,7 @@ public interface DataNodeService { * @param request page request conditions * @return node list */ - PageInfo<DataNodeResponse> list(DataNodePageRequest request); + PageInfo<DataNodeInfo> list(DataNodePageRequest request); /** * Update data node. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java similarity index 83% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java index e87edc293..893987663 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.service.core.impl; +package org.apache.inlong.manager.service.node; import com.github.pagehelper.Page; import com.github.pagehelper.PageHelper; @@ -24,14 +24,12 @@ import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodePageRequest; import org.apache.inlong.manager.pojo.node.DataNodeRequest; -import org.apache.inlong.manager.pojo.node.DataNodeResponse; -import org.apache.inlong.manager.service.core.DataNodeService; import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +39,7 @@ import org.springframework.stereotype.Service; import java.sql.Connection; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * Data node service layer implementation @@ -52,6 +51,8 @@ public class DataNodeServiceImpl implements DataNodeService { @Autowired private DataNodeEntityMapper dataNodeMapper; + @Autowired + private DataNodeOperatorFactory operatorFactory; @Override public Integer save(DataNodeRequest request, String operator) { @@ -65,33 +66,38 @@ public class DataNodeServiceImpl implements DataNodeService { LOGGER.error(errMsg); throw new BusinessException(errMsg); } - DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new); - entity.setCreator(operator); - entity.setModifier(operator); - dataNodeMapper.insert(entity); - + // according to the data type, save sink information + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType()); + int id = dataNodeOperator.saveOpt(request, operator); LOGGER.debug("success to save data node={}", request); - return entity.getId(); + return id; } @Override - public DataNodeResponse get(Integer id) { + public DataNodeInfo get(Integer id) { DataNodeEntity entity = dataNodeMapper.selectById(id); if (entity == null) { LOGGER.error("data node not found by id={}", id); throw new BusinessException("data node not found"); } - DataNodeResponse response = CommonBeanUtils.copyProperties(entity, DataNodeResponse::new); + + String dataNodeType = entity.getType(); + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType); + DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity); LOGGER.debug("success to get data node info by id={}", id); - return response; + return dataNodeInfo; } @Override - public PageInfo<DataNodeResponse> list(DataNodePageRequest request) { + public PageInfo<DataNodeInfo> list(DataNodePageRequest request) { PageHelper.startPage(request.getPageNum(), request.getPageSize()); Page<DataNodeEntity> entityPage = (Page<DataNodeEntity>) dataNodeMapper.selectByCondition(request); - List<DataNodeResponse> responseList = CommonBeanUtils.copyListProperties(entityPage, DataNodeResponse::new); - PageInfo<DataNodeResponse> page = new PageInfo<>(responseList); + List<DataNodeInfo> list = entityPage.stream() + .map(entity -> { + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType()); + return dataNodeOperator.getFromEntity(entity); + }).collect(Collectors.toList()); + PageInfo<DataNodeInfo> page = new PageInfo<>(list); page.setTotal(entityPage.getTotal()); LOGGER.debug("success to list data node by {}", request); return page; @@ -101,9 +107,8 @@ public class DataNodeServiceImpl implements DataNodeService { public Boolean update(DataNodeRequest request, String operator) { String name = request.getName(); String type = request.getType(); - - Integer id = request.getId(); DataNodeEntity exist = dataNodeMapper.selectByNameAndType(name, type); + Integer id = request.getId(); if (exist != null && !Objects.equals(id, exist.getId())) { String errMsg = String.format("data node already exist for name=%s type=%s", name, type); LOGGER.error(errMsg); @@ -121,13 +126,8 @@ public class DataNodeServiceImpl implements DataNodeService { LOGGER.error(errMsg); throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); } - CommonBeanUtils.copyProperties(request, entity, true); - entity.setModifier(operator); - int rowCount = dataNodeMapper.updateById(entity); - if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { - LOGGER.error(errMsg); - throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); - } + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType()); + dataNodeOperator.updateOpt(request, operator); LOGGER.info("success to update data node={}", request); return true; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java new file mode 100644 index 000000000..7cb25bd5f --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.hive; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class HiveDataNodeOperator extends AbstractDataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String dataNodeType) { + return getDataNodeType().equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.HIVE; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + + HiveDataNodeInfo hiveDataNodeInfo = new HiveDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, hiveDataNodeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + HiveDataNodeDTO dto = HiveDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, hiveDataNodeInfo); + } + + LOGGER.debug("success to get hive data node from entity"); + return hiveDataNodeInfo; + } + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + HiveDataNodeRequest hiveDataNodeRequest = (HiveDataNodeRequest) request; + CommonBeanUtils.copyProperties(hiveDataNodeRequest, targetEntity, true); + try { + HiveDataNodeDTO dto = HiveDataNodeDTO.getFromRequest(hiveDataNodeRequest); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + LOGGER.debug("success to set entity for hive data node"); + } catch (Exception e) { + LOGGER.error("failed to set entity for hive data node: ", e); + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage()); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java index a771b2d32..2e790fff1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java @@ -26,15 +26,15 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.pojo.sink.SinkField; -import org.apache.inlong.manager.pojo.sink.SinkRequest; -import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -186,6 +186,21 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator { LOGGER.info("success to save sink fields"); } + @Override + public void deleteOpt(StreamSinkEntity entity, String operator) { + entity.setPreviousStatus(entity.getStatus()); + entity.setStatus(InlongConstants.DELETED_STATUS); + entity.setIsDeleted(entity.getId()); + entity.setModifier(operator); + int rowCount = sinkMapper.updateByPrimaryKeySelective(entity); + if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { + LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", + entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion()); + throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); + } + sinkFieldMapper.logicDeleteAll(entity.getId()); + } + /** * Check the validity of sink fields. */ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java index 69e3b6c1c..e8231c419 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java @@ -89,4 +89,11 @@ public interface StreamSinkOperator { */ void updateFieldOpt(Boolean onlyAdd, SinkRequest request); + /** + * Delete the sink info. + * + * @param entity sink info needs to delete + * @param operator name of the operator + */ + void deleteOpt(StreamSinkEntity entity, String operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 311fd9329..f5860fb9e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -55,7 +55,6 @@ import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -106,13 +105,13 @@ public class StreamSinkServiceImpl implements StreamSinkService { } // According to the sink type, save sink information - StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType()); + StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType()); List<SinkField> fields = request.getSinkFieldList(); // Remove id in sinkField when save if (CollectionUtils.isNotEmpty(fields)) { fields.forEach(sinkField -> sinkField.setId(null)); } - int id = operation.saveOpt(request, operator); + int id = sinkOperator.saveOpt(request, operator); LOGGER.info("success to save sink info: {}", request); return id; @@ -126,8 +125,8 @@ public class StreamSinkServiceImpl implements StreamSinkService { LOGGER.error("sink not found by id={}", id); throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND); } - StreamSinkOperator operation = operatorFactory.getInstance(entity.getSinkType()); - StreamSink streamSink = operation.getFromEntity(entity); + StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType()); + StreamSink streamSink = sinkOperator.getFromEntity(entity); LOGGER.debug("success to get sink info by id={}", id); return streamSink; } @@ -191,8 +190,8 @@ public class StreamSinkServiceImpl implements StreamSinkService { } List<StreamSink> responseList = Lists.newArrayList(); for (Map.Entry<String, Page<StreamSinkEntity>> entry : sinkMap.entrySet()) { - StreamSinkOperator operation = operatorFactory.getInstance(entry.getKey()); - PageInfo<? extends StreamSink> pageInfo = operation.getPageInfo(entry.getValue()); + StreamSinkOperator sinkOperator = operatorFactory.getInstance(entry.getKey()); + PageInfo<? extends StreamSink> pageInfo = sinkOperator.getPageInfo(entry.getValue()); responseList.addAll(pageInfo.getList()); } // Encapsulate the paging query results into the PageInfo object to obtain related paging information @@ -230,8 +229,8 @@ public class StreamSinkServiceImpl implements StreamSinkService { fields.forEach(sinkField -> sinkField.setId(null)); } - StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType()); - operation.updateOpt(request, operator); + StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType()); + sinkOperator.updateOpt(request, operator); // The inlong group status is [Configuration successful], then asynchronously initiate // the [Single inlong stream resource creation] workflow @@ -266,20 +265,8 @@ public class StreamSinkServiceImpl implements StreamSinkService { StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id); Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage()); groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator); - - entity.setPreviousStatus(entity.getStatus()); - entity.setStatus(InlongConstants.DELETED_STATUS); - entity.setIsDeleted(id); - entity.setModifier(operator); - entity.setModifyTime(new Date()); - int rowCount = sinkMapper.updateByPrimaryKeySelective(entity); - if (rowCount != InlongConstants.AFFECTED_ONE_ROW) { - LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}", - entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion()); - throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED); - } - sinkFieldMapper.logicDeleteAll(id); - + StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType()); + sinkOperator.deleteOpt(entity, operator); LOGGER.info("success to delete sink info: {}", entity); return true; } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java index a35aef416..41936336b 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java @@ -18,11 +18,12 @@ package org.apache.inlong.manager.service.core.impl; import com.github.pagehelper.PageInfo; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodePageRequest; -import org.apache.inlong.manager.pojo.node.DataNodeRequest; -import org.apache.inlong.manager.pojo.node.DataNodeResponse; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest; import org.apache.inlong.manager.service.ServiceBaseTest; -import org.apache.inlong.manager.service.core.DataNodeService; +import org.apache.inlong.manager.service.node.DataNodeService; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -39,7 +40,7 @@ public class DataNodeServiceTest extends ServiceBaseTest { * Save data node info. */ public Integer saveOpt(String nodeName, String type, String url, String username, String password) { - DataNodeRequest request = new DataNodeRequest(); + HiveDataNodeRequest request = new HiveDataNodeRequest(); request.setName(nodeName); request.setType(type); request.setUrl(url); @@ -47,13 +48,15 @@ public class DataNodeServiceTest extends ServiceBaseTest { request.setToken(password); request.setDescription("test cluster"); request.setInCharges(GLOBAL_OPERATOR); + request.setJdbcUrl("127.0.0.1"); + request.setToken("123456"); return dataNodeService.save(request, GLOBAL_OPERATOR); } /** * Get data node list info. */ - public PageInfo<DataNodeResponse> listOpt(String type, String name) { + public PageInfo<DataNodeInfo> listOpt(String type, String name) { DataNodePageRequest request = new DataNodePageRequest(); request.setType(type); request.setName(name); @@ -65,7 +68,7 @@ public class DataNodeServiceTest extends ServiceBaseTest { */ public Boolean updateOpt(Integer id, String nodeName, String type, String url, String username, String password, Integer version) { - DataNodeRequest request = new DataNodeRequest(); + HiveDataNodeRequest request = new HiveDataNodeRequest(); request.setId(id); request.setName(nodeName); request.setType(type); @@ -86,7 +89,7 @@ public class DataNodeServiceTest extends ServiceBaseTest { @Test public void testDataService() { String nodeName = "hiveNode1"; - String type = "HIVE"; + String type = DataNodeType.HIVE; String url = "127.0.0.1:8080"; String usename = "admin"; String password = "123"; @@ -96,17 +99,17 @@ public class DataNodeServiceTest extends ServiceBaseTest { Assertions.assertNotNull(id); // test get data node - DataNodeResponse nodeResponse = dataNodeService.get(id); - Assertions.assertNotNull(nodeResponse); - Assertions.assertEquals(type, nodeResponse.getType()); + DataNodeInfo dataNodeInfo = dataNodeService.get(id); + Assertions.assertNotNull(dataNodeInfo); + Assertions.assertEquals(type, dataNodeInfo.getType()); // test get data node list - PageInfo<DataNodeResponse> listDataNode = this.listOpt(type, nodeName); + PageInfo<DataNodeInfo> listDataNode = this.listOpt(type, nodeName); Assertions.assertEquals(listDataNode.getTotal(), 1); // test update data node - String newNodeName = "kafkaNode1"; - String newType = "KAFKA"; + String newNodeName = "hiveNode2"; + String newType = DataNodeType.HIVE; String newUrl = "127.0.0.1:8083"; String newUsername = "admin2"; String newPassword = "456"; diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java index a8c8a81bd..802181c00 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java @@ -22,13 +22,13 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiOperation; import org.apache.inlong.manager.common.enums.OperationType; -import org.apache.inlong.manager.pojo.common.Response; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodePageRequest; import org.apache.inlong.manager.pojo.node.DataNodeRequest; -import org.apache.inlong.manager.pojo.node.DataNodeResponse; import org.apache.inlong.manager.pojo.user.UserRoleCode; -import org.apache.inlong.manager.service.core.DataNodeService; +import org.apache.inlong.manager.service.node.DataNodeService; import org.apache.inlong.manager.service.operationlog.OperationLog; import org.apache.inlong.manager.service.user.LoginUserUtils; import org.apache.shiro.authz.annotation.RequiresRoles; @@ -65,13 +65,13 @@ public class DataNodeController { @GetMapping(value = "/node/get/{id}") @ApiOperation(value = "Get node by id") @ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true) - public Response<DataNodeResponse> get(@PathVariable Integer id) { + public Response<DataNodeInfo> get(@PathVariable Integer id) { return Response.success(dataNodeService.get(id)); } @PostMapping(value = "/node/list") @ApiOperation(value = "List data node") - public Response<PageInfo<DataNodeResponse>> list(@RequestBody DataNodePageRequest request) { + public Response<PageInfo<DataNodeInfo>> list(@RequestBody DataNodePageRequest request) { return Response.success(dataNodeService.list(request)); } diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java index 99223394f..ba7b10353 100644 --- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java +++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java @@ -17,12 +17,14 @@ package org.apache.inlong.manager.web.controller; -import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.pojo.node.DataNodeRequest; -import org.apache.inlong.manager.pojo.node.DataNodeResponse; import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.DataNodeResponse; +import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest; import org.apache.inlong.manager.web.WebBaseTest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -36,15 +38,15 @@ class DataNodeControllerTest extends WebBaseTest { @Resource DataNodeEntityMapper dataNodeEntityMapper; - DataNodeRequest getDataNodeRequest() { - return DataNodeRequest.builder() - .name("hiveNode1") - .type("HIVE") - .url("127.0.0.1:8080") - .username("admin") - .token("123") - .inCharges("admin") - .build(); + HiveDataNodeRequest getHiveDataNodeRequest() { + HiveDataNodeRequest hiveDataNodeRequest = new HiveDataNodeRequest(); + hiveDataNodeRequest.setName("hiveNode1"); + hiveDataNodeRequest.setType(DataNodeType.HIVE); + hiveDataNodeRequest.setUrl("127.0.0.1:8080"); + hiveDataNodeRequest.setUsername("admin"); + hiveDataNodeRequest.setToken("123"); + hiveDataNodeRequest.setInCharges("admin"); + return hiveDataNodeRequest; } @Test @@ -52,7 +54,7 @@ class DataNodeControllerTest extends WebBaseTest { logout(); operatorLogin(); - MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest()); + MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest()); Response<Integer> response = getResBody(mvcResult, Integer.class); Assertions.assertEquals("Current user [operator] has no permission to access URL", response.getErrMsg()); @@ -61,7 +63,7 @@ class DataNodeControllerTest extends WebBaseTest { @Test void testSaveAndGetAndDelete() throws Exception { // save - MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest()); + MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest()); Integer dataNodeId = getResBodyObj(mvcResult, Integer.class); Assertions.assertNotNull(dataNodeId); @@ -71,7 +73,7 @@ class DataNodeControllerTest extends WebBaseTest { DataNodeResponse dataNode = getResBodyObj(getResult, DataNodeResponse.class); Assertions.assertNotNull(dataNode); - Assertions.assertEquals(getDataNodeRequest().getName(), dataNode.getName()); + Assertions.assertEquals(getHiveDataNodeRequest().getName(), dataNode.getName()); // delete MvcResult deleteResult = deleteForSuccessMvcResult("/api/node/delete/{id}", dataNodeId); @@ -99,7 +101,7 @@ class DataNodeControllerTest extends WebBaseTest { dataNodeEntityMapper.insert(nodeEntity); - DataNodeRequest request = getDataNodeRequest(); + DataNodeRequest request = getHiveDataNodeRequest(); request.setId(nodeEntity.getId()); request.setName("test447777"); request.setVersion(nodeEntity.getVersion()); @@ -114,7 +116,7 @@ class DataNodeControllerTest extends WebBaseTest { @Test void testUpdateFailByNoId() throws Exception { - MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getDataNodeRequest()); + MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getHiveDataNodeRequest()); Response<Boolean> response = getResBody(mvcResult, Boolean.class); Assertions.assertFalse(response.isSuccess());
