This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8b0969777 [INLONG-6063][Manager] Use the data node info for ClickHouse
and Iceberg (#6064)
8b0969777 is described below
commit 8b0969777ec04a12f49c0bb7e8800ae94d641a85
Author: woofyzhao <[email protected]>
AuthorDate: Wed Oct 5 23:12:50 2022 +0800
[INLONG-6063][Manager] Use the data node info for ClickHouse and Iceberg
(#6064)
---
.../pojo/node/ck/ClickHouseDataNodeDTO.java | 58 +++++++++++++++
.../ClickHouseDataNodeInfo.java} | 40 ++++------
.../ClickHouseDataNodeRequest.java} | 33 ++-------
.../manager/pojo/node/hive/HiveDataNodeDTO.java | 4 -
.../pojo/node/hive/HiveDataNodeRequest.java | 3 -
.../IcebergDataNodeDTO.java} | 45 ++++-------
.../IcebergDataNodeInfo.java} | 39 ++++------
.../IcebergDataNodeRequest.java} | 32 +++-----
.../node/ck/ClickHouseDataNodeOperator.java | 85 +++++++++++++++++++++
.../node/iceberg/IcebergDataNodeOperator.java | 86 ++++++++++++++++++++++
.../sink/ck/ClickHouseResourceOperator.java | 25 ++++++-
.../resource/sink/hive/HiveResourceOperator.java | 8 +-
.../sink/iceberg/IcebergResourceOperator.java | 27 ++++++-
.../manager/service/sink/AbstractSinkOperator.java | 3 +
.../service/sink/ck/ClickHouseSinkOperator.java | 11 +++
.../service/sink/hive/HiveSinkOperator.java | 11 +++
.../service/sink/iceberg/IcebergSinkOperator.java | 13 ++++
.../service/core/impl/DataNodeServiceTest.java | 1 -
.../manager/service/sink/HiveSinkServiceTest.java | 2 +
.../service/sink/IcebergSinkServiceTest.java | 1 +
20 files changed, 382 insertions(+), 145 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
new file mode 100644
index 000000000..94bad14d8
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.ck;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import javax.validation.constraints.NotNull;
+
+/**
+ * ClickHouse data node info
+ */
+@Data
+@Builder
+@ApiModel("ClickHouse data node info")
+public class ClickHouseDataNodeDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); //
thread safe
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static ClickHouseDataNodeDTO
getFromRequest(ClickHouseDataNodeRequest request) throws Exception {
+ return ClickHouseDataNodeDTO.builder().build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static ClickHouseDataNodeDTO getFromJson(@NotNull String extParams)
{
+ try {
+
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ return OBJECT_MAPPER.readValue(extParams,
ClickHouseDataNodeDTO.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeInfo.java
similarity index 51%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeInfo.java
index ab51ed666..b4b129127 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeInfo.java
@@ -15,47 +15,33 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.ck;
import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
/**
- * Hive data node request
+ * ClickHouse data node info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.CLICKHOUSE)
+@ApiModel("ClickHouse data node info")
+public class ClickHouseDataNodeInfo extends DataNodeInfo {
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
-
- @ApiModelProperty("Version for Hive, such as: 3.2.1")
- private String hiveVersion;
-
- @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in
light mode, must include hive-site.xml")
- private String hiveConfDir;
-
- @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
- private String hdfsPath;
-
- @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
- private String warehouse;
-
- @ApiModelProperty("User and group information for writing data to HDFS")
- private String hdfsUgi;
-
- public HiveDataNodeRequest() {
- this.setType(DataNodeType.HIVE);
+ public ClickHouseDataNodeInfo() {
+ this.setType(DataNodeType.CLICKHOUSE);
}
+ @Override
+ public ClickHouseDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this,
ClickHouseDataNodeRequest::new);
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeRequest.java
similarity index 54%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeRequest.java
index ab51ed666..5a981190d 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeRequest.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.ck;
import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -27,35 +26,17 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
/**
- * Hive data node request
+ * ClickHouse data node request
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.CLICKHOUSE)
+@ApiModel("ClickHouse data node request")
+public class ClickHouseDataNodeRequest extends DataNodeRequest {
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
-
- @ApiModelProperty("Version for Hive, such as: 3.2.1")
- private String hiveVersion;
-
- @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in
light mode, must include hive-site.xml")
- private String hiveConfDir;
-
- @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
- private String hdfsPath;
-
- @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
- private String warehouse;
-
- @ApiModelProperty("User and group information for writing data to HDFS")
- private String hdfsUgi;
-
- public HiveDataNodeRequest() {
- this.setType(DataNodeType.HIVE);
+ public ClickHouseDataNodeRequest() {
+ this.setType(DataNodeType.CLICKHOUSE);
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
index 5922e61e4..3c75602f6 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
@@ -46,9 +46,6 @@ public class HiveDataNodeDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); //
thread safe
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
-
@ApiModelProperty("Version for Hive, such as: 3.2.1")
private String hiveVersion;
@@ -69,7 +66,6 @@ public class HiveDataNodeDTO {
*/
public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request)
throws Exception {
return HiveDataNodeDTO.builder()
- .jdbcUrl(request.getJdbcUrl())
.hiveVersion(request.getHiveVersion())
.hiveConfDir(request.getHiveConfDir())
.hdfsPath(request.getHdfsPath())
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
index ab51ed666..d7c4d6902 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
@@ -36,9 +36,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@ApiModel("Hive data node request")
public class HiveDataNodeRequest extends DataNodeRequest {
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
-
@ApiModelProperty("Version for Hive, such as: 3.2.1")
private String hiveVersion;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
similarity index 60%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
index 5922e61e4..b514866f2 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.iceberg;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,60 +33,45 @@ import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
/**
- * Hive data node info
+ * Iceberg data node info
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Hive data node info")
-public class HiveDataNodeDTO {
+@ApiModel("Iceberg data node info")
+public class IcebergDataNodeDTO {
- private static final Logger LOGGER =
LoggerFactory.getLogger(HiveDataNodeDTO.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergDataNodeDTO.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); //
thread safe
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ @Builder.Default
+ private String catalogType = "HIVE";
- @ApiModelProperty("Version for Hive, such as: 3.2.1")
- private String hiveVersion;
-
- @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in
light mode, must include hive-site.xml")
- private String hiveConfDir;
-
- @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
- private String hdfsPath;
-
- @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+ @ApiModelProperty("Iceberg data warehouse dir")
private String warehouse;
- @ApiModelProperty("User and group information for writing data to HDFS")
- private String hdfsUgi;
-
/**
* Get the dto instance from the request
*/
- public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request)
throws Exception {
- return HiveDataNodeDTO.builder()
- .jdbcUrl(request.getJdbcUrl())
- .hiveVersion(request.getHiveVersion())
- .hiveConfDir(request.getHiveConfDir())
- .hdfsPath(request.getHdfsPath())
+ public static IcebergDataNodeDTO getFromRequest(IcebergDataNodeRequest
request) throws Exception {
+ return IcebergDataNodeDTO.builder()
+ .catalogType(request.getCatalogType())
.warehouse(request.getWarehouse())
- .hdfsUgi(request.getHdfsUgi())
.build();
}
/**
* Get the dto instance from the JSON string.
*/
- public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
+ public static IcebergDataNodeDTO getFromJson(@NotNull String extParams) {
try {
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
- return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
+ return OBJECT_MAPPER.readValue(extParams,
IcebergDataNodeDTO.class);
} catch (Exception e) {
- LOGGER.error("Failed to extract additional parameters for hive
data node: ", e);
+ LOGGER.error("Failed to extract additional parameters for iceberg
data node: ", e);
throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeInfo.java
similarity index 55%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeInfo.java
index ab51ed666..07ef3becc 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.iceberg;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -23,39 +23,32 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
/**
- * Hive data node request
+ * Iceberg data node info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.ICEBERG)
+@ApiModel("Iceberg data node info")
+public class IcebergDataNodeInfo extends DataNodeInfo {
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
- @ApiModelProperty("Version for Hive, such as: 3.2.1")
- private String hiveVersion;
-
- @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in
light mode, must include hive-site.xml")
- private String hiveConfDir;
-
- @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
- private String hdfsPath;
-
- @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+ @ApiModelProperty("Iceberg data warehouse dir")
private String warehouse;
- @ApiModelProperty("User and group information for writing data to HDFS")
- private String hdfsUgi;
-
- public HiveDataNodeRequest() {
- this.setType(DataNodeType.HIVE);
+ public IcebergDataNodeInfo() {
+ this.setType(DataNodeType.ICEBERG);
}
+ @Override
+ public IcebergDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this,
IcebergDataNodeRequest::new);
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeRequest.java
similarity index 57%
copy from
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeRequest.java
index ab51ed666..19adbfb6f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeRequest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.iceberg;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -27,35 +27,23 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
/**
- * Hive data node request
+ * Iceberg data node request
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.ICEBERG)
+@ApiModel("Iceberg data node request")
+public class IcebergDataNodeRequest extends DataNodeRequest {
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
- @ApiModelProperty("Version for Hive, such as: 3.2.1")
- private String hiveVersion;
-
- @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in
light mode, must include hive-site.xml")
- private String hiveConfDir;
-
- @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
- private String hdfsPath;
-
- @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+ @ApiModelProperty("Iceberg data warehouse dir")
private String warehouse;
- @ApiModelProperty("User and group information for writing data to HDFS")
- private String hdfsUgi;
-
- public HiveDataNodeRequest() {
- this.setType(DataNodeType.HIVE);
+ public IcebergDataNodeRequest() {
+ this.setType(DataNodeType.ICEBERG);
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
new file mode 100644
index 000000000..08e15de38
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.ck;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClickHouseDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.CLICKHOUSE;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ ClickHouseDataNodeInfo ckDataNodeInfo = new ClickHouseDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, ckDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ ClickHouseDataNodeDTO dto =
ClickHouseDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, ckDataNodeInfo);
+ }
+
+ return ckDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ ClickHouseDataNodeRequest ckDataNodeRequest =
(ClickHouseDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(ckDataNodeRequest, targetEntity, true);
+ try {
+ ClickHouseDataNodeDTO dto =
ClickHouseDataNodeDTO.getFromRequest(ckDataNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("failed to set entity for hive data node: ", e);
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
new file mode 100644
index 000000000..991764610
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.iceberg;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class IcebergDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.ICEBERG;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ IcebergDataNodeInfo icebergDataNodeInfo = new IcebergDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, icebergDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ IcebergDataNodeDTO dto =
IcebergDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, icebergDataNodeInfo);
+ }
+
+ LOGGER.debug("success to get iceberg data node from entity");
+ return icebergDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ IcebergDataNodeRequest icebergDataNodeRequest =
(IcebergDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(icebergDataNodeRequest, targetEntity,
true);
+ try {
+ IcebergDataNodeDTO dto =
IcebergDataNodeDTO.getFromRequest(icebergDataNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("failed to set entity for iceberg data node: ", e);
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
index f8aa955e4..ee3a00032 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
@@ -18,16 +18,21 @@
package org.apache.inlong.manager.service.resource.sink.ck;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -51,6 +56,8 @@ public class ClickHouseResourceOperator implements
SinkResourceOperator {
private StreamSinkService sinkService;
@Autowired
private StreamSinkFieldEntityMapper clickHouseFieldMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
@Override
public Boolean accept(String sinkType) {
@@ -75,6 +82,22 @@ public class ClickHouseResourceOperator implements
SinkResourceOperator {
this.createTable(sinkInfo);
}
+ private ClickHouseSinkDTO getClickHouseInfo(SinkInfo sinkInfo) {
+ ClickHouseSinkDTO ckInfo =
ClickHouseSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+ // read from data node if not supplied by user
+ if (StringUtils.isBlank(ckInfo.getJdbcUrl())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "clickhouse jdbc url not
specified and data node is empty");
+ ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, ckInfo);
+ ckInfo.setJdbcUrl(dataNodeInfo.getUrl());
+ ckInfo.setPassword(dataNodeInfo.getToken());
+ }
+ return ckInfo;
+ }
+
private void createTable(SinkInfo sinkInfo) {
LOGGER.info("begin to create clickhouse table for sinkId={}",
sinkInfo.getId());
@@ -94,7 +117,7 @@ public class ClickHouseResourceOperator implements
SinkResourceOperator {
}
try {
- ClickHouseSinkDTO ckInfo =
ClickHouseSinkDTO.getFromJson(sinkInfo.getExtParams());
+ ClickHouseSinkDTO ckInfo = getClickHouseInfo(sinkInfo);
ClickHouseTableInfo tableInfo =
ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, columnList);
String url = ckInfo.getJdbcUrl();
String user = ckInfo.getUsername();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
index 9542c7e03..c318634f6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
@@ -86,12 +86,7 @@ public class HiveResourceOperator implements
SinkResourceOperator {
}
private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) {
- HiveSinkDTO hiveInfo = new HiveSinkDTO();
-
- if (StringUtils.isNotBlank(sinkInfo.getExtParams())) {
- HiveSinkDTO userSinkInfo =
HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
- CommonBeanUtils.copyProperties(userSinkInfo, hiveInfo);
- }
+ HiveSinkDTO hiveInfo =
HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
// read from data node if not supplied by user
if (StringUtils.isBlank(hiveInfo.getJdbcUrl())) {
@@ -101,7 +96,6 @@ public class HiveResourceOperator implements
SinkResourceOperator {
dataNodeName, sinkInfo.getSinkType());
CommonBeanUtils.copyProperties(dataNodeInfo, hiveInfo);
hiveInfo.setJdbcUrl(dataNodeInfo.getUrl());
- hiveInfo.setUsername(dataNodeInfo.getUsername());
hiveInfo.setPassword(dataNodeInfo.getToken());
}
return hiveInfo;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
index fbbbc413f..1d1ce97bb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
@@ -18,16 +18,21 @@
package org.apache.inlong.manager.service.resource.sink.iceberg;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergColumnInfo;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkDTO;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergTableInfo;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -48,10 +53,14 @@ public class IcebergResourceOperator implements
SinkResourceOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergResourceOperator.class);
+ private static final String CATALOG_TYPE_HIVE = "HIVE";
+
@Autowired
private StreamSinkService sinkService;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
@Override
public Boolean accept(String sinkType) {
@@ -78,11 +87,27 @@ public class IcebergResourceOperator implements
SinkResourceOperator {
this.createTable(sinkInfo);
}
+ private IcebergSinkDTO getIcebergInfo(SinkInfo sinkInfo) {
+ IcebergSinkDTO icebergInfo =
IcebergSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+ // read uri from data node if not supplied by user
+ if (StringUtils.isBlank(icebergInfo.getCatalogUri())
+ && CATALOG_TYPE_HIVE.equals(icebergInfo.getCatalogType())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "iceberg catalog uri not
specified and data node is empty");
+ IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, icebergInfo);
+ icebergInfo.setCatalogUri(dataNodeInfo.getUrl());
+ }
+ return icebergInfo;
+ }
+
private void createTable(SinkInfo sinkInfo) {
LOGGER.info("begin to create iceberg table for sinkInfo={}", sinkInfo);
// Get all info from config
- IcebergSinkDTO icebergInfo =
IcebergSinkDTO.getFromJson(sinkInfo.getExtParams());
+ IcebergSinkDTO icebergInfo = getIcebergInfo(sinkInfo);
List<IcebergColumnInfo> columnInfoList = getColumnList(sinkInfo);
if (CollectionUtils.isEmpty(columnInfoList)) {
throw new IllegalArgumentException("no iceberg columns specified");
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index e4db70a34..db6cd23aa 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -57,6 +58,8 @@ public abstract class AbstractSinkOperator implements
StreamSinkOperator {
protected StreamSinkEntityMapper sinkMapper;
@Autowired
protected StreamSinkFieldEntityMapper sinkFieldMapper;
+ @Autowired
+ protected DataNodeOperateHelper dataNodeHelper;
/**
* Setting the parameters of the latest entity.
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 9442e7cb2..b5814ddff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -18,9 +18,11 @@
package org.apache.inlong.manager.service.sink.ck;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -82,6 +84,15 @@ public class ClickHouseSinkOperator extends
AbstractSinkOperator {
}
ClickHouseSinkDTO dto =
ClickHouseSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ Preconditions.checkNotEmpty(entity.getDataNodeName(),
+ "clickhouse jdbc url unspecified and data node is empty");
+ ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setJdbcUrl(dataNodeInfo.getUrl());
+ dto.setPassword(dataNodeInfo.getToken());
+ }
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
index 0c630c955..67b9e133a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
@@ -18,9 +18,11 @@
package org.apache.inlong.manager.service.sink.hive;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,15 @@ public class HiveSinkOperator extends AbstractSinkOperator {
}
HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getJdbcUrl())) {
+ Preconditions.checkNotEmpty(entity.getDataNodeName(),
+ "hive jdbc url unspecified and data node is empty");
+ HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setJdbcUrl(dataNodeInfo.getUrl());
+ dto.setPassword(dataNodeInfo.getToken());
+ }
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index f5fc4e8d4..e8da4f8a4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -18,10 +18,12 @@
package org.apache.inlong.manager.service.sink.iceberg;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -48,6 +50,8 @@ public class IcebergSinkOperator extends AbstractSinkOperator
{
private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergSinkOperator.class);
+ private static final String CATALOG_TYPE_HIVE = "HIVE";
+
@Autowired
private ObjectMapper objectMapper;
@@ -83,6 +87,15 @@ public class IcebergSinkOperator extends
AbstractSinkOperator {
}
IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getCatalogUri()) &&
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
+ Preconditions.checkNotEmpty(entity.getDataNodeName(),
+ "iceberg catalog uri unspecified and data node is empty");
+ IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setCatalogUri(dataNodeInfo.getUrl());
+ }
+
CommonBeanUtils.copyProperties(entity, sink, true);
CommonBeanUtils.copyProperties(dto, sink, true);
List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
index 428dd4f63..efd3311e8 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
@@ -48,7 +48,6 @@ public class DataNodeServiceTest extends ServiceBaseTest {
request.setToken(password);
request.setDescription("test cluster");
request.setInCharges(GLOBAL_OPERATOR);
- request.setJdbcUrl("127.0.0.1");
request.setToken("123456");
return dataNodeService.save(request, GLOBAL_OPERATOR);
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 274bf9204..6197d4e81 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -38,6 +38,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
private final String globalStreamId = "stream1";
private final String globalOperator = "admin";
private final String sinkName = "default";
+ private final String jdbcUrl = "127.0.0.1:8080";
@Autowired
private StreamSinkService sinkService;
@@ -56,6 +57,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
sinkInfo.setSinkType(SinkType.HIVE);
sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
sinkInfo.setSinkName(sinkName);
+ sinkInfo.setJdbcUrl(jdbcUrl);
return sinkService.save(sinkInfo, globalOperator);
}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
index fa3900163..59ed957be 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
@@ -56,6 +56,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest {
sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
sinkInfo.setSinkName(sinkName);
sinkInfo.setId((int) (Math.random() * 100000 + 1));
+ sinkInfo.setCatalogUri("thrift://127.0.0.1:9000");
return sinkService.save(sinkInfo, globalOperator);
}