This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b0969777 [INLONG-6063][Manager] Use the data node info for ClickHouse 
and Iceberg (#6064)
8b0969777 is described below

commit 8b0969777ec04a12f49c0bb7e8800ae94d641a85
Author: woofyzhao <[email protected]>
AuthorDate: Wed Oct 5 23:12:50 2022 +0800

    [INLONG-6063][Manager] Use the data node info for ClickHouse and Iceberg 
(#6064)
---
 .../pojo/node/ck/ClickHouseDataNodeDTO.java        | 58 +++++++++++++++
 .../ClickHouseDataNodeInfo.java}                   | 40 ++++------
 .../ClickHouseDataNodeRequest.java}                | 33 ++-------
 .../manager/pojo/node/hive/HiveDataNodeDTO.java    |  4 -
 .../pojo/node/hive/HiveDataNodeRequest.java        |  3 -
 .../IcebergDataNodeDTO.java}                       | 45 ++++-------
 .../IcebergDataNodeInfo.java}                      | 39 ++++------
 .../IcebergDataNodeRequest.java}                   | 32 +++-----
 .../node/ck/ClickHouseDataNodeOperator.java        | 85 +++++++++++++++++++++
 .../node/iceberg/IcebergDataNodeOperator.java      | 86 ++++++++++++++++++++++
 .../sink/ck/ClickHouseResourceOperator.java        | 25 ++++++-
 .../resource/sink/hive/HiveResourceOperator.java   |  8 +-
 .../sink/iceberg/IcebergResourceOperator.java      | 27 ++++++-
 .../manager/service/sink/AbstractSinkOperator.java |  3 +
 .../service/sink/ck/ClickHouseSinkOperator.java    | 11 +++
 .../service/sink/hive/HiveSinkOperator.java        | 11 +++
 .../service/sink/iceberg/IcebergSinkOperator.java  | 13 ++++
 .../service/core/impl/DataNodeServiceTest.java     |  1 -
 .../manager/service/sink/HiveSinkServiceTest.java  |  2 +
 .../service/sink/IcebergSinkServiceTest.java       |  1 +
 20 files changed, 382 insertions(+), 145 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
new file mode 100644
index 000000000..94bad14d8
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.ck;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import javax.validation.constraints.NotNull;
+
+/**
+ * ClickHouse data node info
+ */
+@Data
+@Builder
+@ApiModel("ClickHouse data node info")
+public class ClickHouseDataNodeDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // 
thread safe
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static ClickHouseDataNodeDTO 
getFromRequest(ClickHouseDataNodeRequest request) throws Exception {
+        return ClickHouseDataNodeDTO.builder().build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static ClickHouseDataNodeDTO getFromJson(@NotNull String extParams) 
{
+        try {
+            
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+            return OBJECT_MAPPER.readValue(extParams, 
ClickHouseDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeInfo.java
similarity index 51%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeInfo.java
index ab51ed666..b4b129127 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeInfo.java
@@ -15,47 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.ck;
 
 import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 
 /**
- * Hive data node request
+ * ClickHouse data node info
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.CLICKHOUSE)
+@ApiModel("ClickHouse data node info")
+public class ClickHouseDataNodeInfo extends DataNodeInfo {
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
-
-    @ApiModelProperty("Version for Hive, such as: 3.2.1")
-    private String hiveVersion;
-
-    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in 
light mode, must include hive-site.xml")
-    private String hiveConfDir;
-
-    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
-    private String hdfsPath;
-
-    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
-    private String warehouse;
-
-    @ApiModelProperty("User and group information for writing data to HDFS")
-    private String hdfsUgi;
-
-    public HiveDataNodeRequest() {
-        this.setType(DataNodeType.HIVE);
+    public ClickHouseDataNodeInfo() {
+        this.setType(DataNodeType.CLICKHOUSE);
     }
 
+    @Override
+    public ClickHouseDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
ClickHouseDataNodeRequest::new);
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeRequest.java
similarity index 54%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeRequest.java
index ab51ed666..5a981190d 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeRequest.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.ck;
 
 import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -27,35 +26,17 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 
 /**
- * Hive data node request
+ * ClickHouse data node request
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.CLICKHOUSE)
+@ApiModel("ClickHouse data node request")
+public class ClickHouseDataNodeRequest extends DataNodeRequest {
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
-
-    @ApiModelProperty("Version for Hive, such as: 3.2.1")
-    private String hiveVersion;
-
-    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in 
light mode, must include hive-site.xml")
-    private String hiveConfDir;
-
-    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
-    private String hdfsPath;
-
-    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
-    private String warehouse;
-
-    @ApiModelProperty("User and group information for writing data to HDFS")
-    private String hdfsUgi;
-
-    public HiveDataNodeRequest() {
-        this.setType(DataNodeType.HIVE);
+    public ClickHouseDataNodeRequest() {
+        this.setType(DataNodeType.CLICKHOUSE);
     }
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
index 5922e61e4..3c75602f6 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
@@ -46,9 +46,6 @@ public class HiveDataNodeDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // 
thread safe
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
-
     @ApiModelProperty("Version for Hive, such as: 3.2.1")
     private String hiveVersion;
 
@@ -69,7 +66,6 @@ public class HiveDataNodeDTO {
      */
     public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) 
throws Exception {
         return HiveDataNodeDTO.builder()
-                .jdbcUrl(request.getJdbcUrl())
                 .hiveVersion(request.getHiveVersion())
                 .hiveConfDir(request.getHiveConfDir())
                 .hdfsPath(request.getHdfsPath())
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
index ab51ed666..d7c4d6902 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
@@ -36,9 +36,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 @ApiModel("Hive data node request")
 public class HiveDataNodeRequest extends DataNodeRequest {
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
-
     @ApiModelProperty("Version for Hive, such as: 3.2.1")
     private String hiveVersion;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
similarity index 60%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
index 5922e61e4..b514866f2 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeDTO.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.iceberg;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -33,60 +33,45 @@ import org.slf4j.LoggerFactory;
 import javax.validation.constraints.NotNull;
 
 /**
- * Hive data node info
+ * Iceberg data node info
  */
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Hive data node info")
-public class HiveDataNodeDTO {
+@ApiModel("Iceberg data node info")
+public class IcebergDataNodeDTO {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveDataNodeDTO.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergDataNodeDTO.class);
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // 
thread safe
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    @Builder.Default
+    private String catalogType = "HIVE";
 
-    @ApiModelProperty("Version for Hive, such as: 3.2.1")
-    private String hiveVersion;
-
-    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in 
light mode, must include hive-site.xml")
-    private String hiveConfDir;
-
-    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
-    private String hdfsPath;
-
-    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    @ApiModelProperty("Iceberg data warehouse dir")
     private String warehouse;
 
-    @ApiModelProperty("User and group information for writing data to HDFS")
-    private String hdfsUgi;
-
     /**
      * Get the dto instance from the request
      */
-    public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) 
throws Exception {
-        return HiveDataNodeDTO.builder()
-                .jdbcUrl(request.getJdbcUrl())
-                .hiveVersion(request.getHiveVersion())
-                .hiveConfDir(request.getHiveConfDir())
-                .hdfsPath(request.getHdfsPath())
+    public static IcebergDataNodeDTO getFromRequest(IcebergDataNodeRequest 
request) throws Exception {
+        return IcebergDataNodeDTO.builder()
+                .catalogType(request.getCatalogType())
                 .warehouse(request.getWarehouse())
-                .hdfsUgi(request.getHdfsUgi())
                 .build();
     }
 
     /**
      * Get the dto instance from the JSON string.
      */
-    public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
+    public static IcebergDataNodeDTO getFromJson(@NotNull String extParams) {
         try {
             
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-            return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
+            return OBJECT_MAPPER.readValue(extParams, 
IcebergDataNodeDTO.class);
         } catch (Exception e) {
-            LOGGER.error("Failed to extract additional parameters for hive 
data node: ", e);
+            LOGGER.error("Failed to extract additional parameters for iceberg 
data node: ", e);
             throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
         }
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeInfo.java
similarity index 55%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeInfo.java
index ab51ed666..07ef3becc 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeInfo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.iceberg;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -23,39 +23,32 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 
 /**
- * Hive data node request
+ * Iceberg data node info
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.ICEBERG)
+@ApiModel("Iceberg data node info")
+public class IcebergDataNodeInfo extends DataNodeInfo {
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
 
-    @ApiModelProperty("Version for Hive, such as: 3.2.1")
-    private String hiveVersion;
-
-    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in 
light mode, must include hive-site.xml")
-    private String hiveConfDir;
-
-    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
-    private String hdfsPath;
-
-    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    @ApiModelProperty("Iceberg data warehouse dir")
     private String warehouse;
 
-    @ApiModelProperty("User and group information for writing data to HDFS")
-    private String hdfsUgi;
-
-    public HiveDataNodeRequest() {
-        this.setType(DataNodeType.HIVE);
+    public IcebergDataNodeInfo() {
+        this.setType(DataNodeType.ICEBERG);
     }
 
+    @Override
+    public IcebergDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
IcebergDataNodeRequest::new);
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeRequest.java
similarity index 57%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeRequest.java
index ab51ed666..19adbfb6f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/iceberg/IcebergDataNodeRequest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.node.hive;
+package org.apache.inlong.manager.pojo.node.iceberg;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -27,35 +27,23 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 
 /**
- * Hive data node request
+ * Iceberg data node request
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@JsonTypeDefine(value = DataNodeType.HIVE)
-@ApiModel("Hive data node request")
-public class HiveDataNodeRequest extends DataNodeRequest {
+@JsonTypeDefine(value = DataNodeType.ICEBERG)
+@ApiModel("Iceberg data node request")
+public class IcebergDataNodeRequest extends DataNodeRequest {
 
-    @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-    private String jdbcUrl;
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
 
-    @ApiModelProperty("Version for Hive, such as: 3.2.1")
-    private String hiveVersion;
-
-    @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in 
light mode, must include hive-site.xml")
-    private String hiveConfDir;
-
-    @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
-    private String hdfsPath;
-
-    @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+    @ApiModelProperty("Iceberg data warehouse dir")
     private String warehouse;
 
-    @ApiModelProperty("User and group information for writing data to HDFS")
-    private String hdfsUgi;
-
-    public HiveDataNodeRequest() {
-        this.setType(DataNodeType.HIVE);
+    public IcebergDataNodeRequest() {
+        this.setType(DataNodeType.ICEBERG);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
new file mode 100644
index 000000000..08e15de38
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.ck;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClickHouseDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.CLICKHOUSE;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        ClickHouseDataNodeInfo ckDataNodeInfo = new ClickHouseDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, ckDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            ClickHouseDataNodeDTO dto = 
ClickHouseDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, ckDataNodeInfo);
+        }
+
+        return ckDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        ClickHouseDataNodeRequest ckDataNodeRequest = 
(ClickHouseDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(ckDataNodeRequest, targetEntity, true);
+        try {
+            ClickHouseDataNodeDTO dto = 
ClickHouseDataNodeDTO.getFromRequest(ckDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            LOGGER.error("failed to set entity for hive data node: ", e);
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
new file mode 100644
index 000000000..991764610
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/iceberg/IcebergDataNodeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.iceberg;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class IcebergDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.ICEBERG;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        IcebergDataNodeInfo icebergDataNodeInfo = new IcebergDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, icebergDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            IcebergDataNodeDTO dto = 
IcebergDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, icebergDataNodeInfo);
+        }
+
+        LOGGER.debug("success to get iceberg data node from entity");
+        return icebergDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        IcebergDataNodeRequest icebergDataNodeRequest = 
(IcebergDataNodeRequest) request;
+        CommonBeanUtils.copyProperties(icebergDataNodeRequest, targetEntity, 
true);
+        try {
+            IcebergDataNodeDTO dto = 
IcebergDataNodeDTO.getFromRequest(icebergDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            LOGGER.error("failed to set entity for iceberg data node: ", e);
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
index f8aa955e4..ee3a00032 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
@@ -18,16 +18,21 @@
 package org.apache.inlong.manager.service.resource.sink.ck;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -51,6 +56,8 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
     private StreamSinkService sinkService;
     @Autowired
     private StreamSinkFieldEntityMapper clickHouseFieldMapper;
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
 
     @Override
     public Boolean accept(String sinkType) {
@@ -75,6 +82,22 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
         this.createTable(sinkInfo);
     }
 
+    private ClickHouseSinkDTO getClickHouseInfo(SinkInfo sinkInfo) {
+        ClickHouseSinkDTO ckInfo = 
ClickHouseSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        // read from data node if not supplied by user
+        if (StringUtils.isBlank(ckInfo.getJdbcUrl())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "clickhouse jdbc url not 
specified and data node is empty");
+            ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, ckInfo);
+            ckInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            ckInfo.setPassword(dataNodeInfo.getToken());
+        }
+        return ckInfo;
+    }
+
     private void createTable(SinkInfo sinkInfo) {
         LOGGER.info("begin to create clickhouse table for sinkId={}", 
sinkInfo.getId());
 
@@ -94,7 +117,7 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
         }
 
         try {
-            ClickHouseSinkDTO ckInfo = 
ClickHouseSinkDTO.getFromJson(sinkInfo.getExtParams());
+            ClickHouseSinkDTO ckInfo = getClickHouseInfo(sinkInfo);
             ClickHouseTableInfo tableInfo = 
ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, columnList);
             String url = ckInfo.getJdbcUrl();
             String user = ckInfo.getUsername();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
index 9542c7e03..c318634f6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
@@ -86,12 +86,7 @@ public class HiveResourceOperator implements 
SinkResourceOperator {
     }
 
     private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) {
-        HiveSinkDTO hiveInfo = new HiveSinkDTO();
-
-        if (StringUtils.isNotBlank(sinkInfo.getExtParams())) {
-            HiveSinkDTO userSinkInfo = 
HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
-            CommonBeanUtils.copyProperties(userSinkInfo, hiveInfo);
-        }
+        HiveSinkDTO hiveInfo = 
HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
 
         // read from data node if not supplied by user
         if (StringUtils.isBlank(hiveInfo.getJdbcUrl())) {
@@ -101,7 +96,6 @@ public class HiveResourceOperator implements 
SinkResourceOperator {
                     dataNodeName, sinkInfo.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, hiveInfo);
             hiveInfo.setJdbcUrl(dataNodeInfo.getUrl());
-            hiveInfo.setUsername(dataNodeInfo.getUsername());
             hiveInfo.setPassword(dataNodeInfo.getToken());
         }
         return hiveInfo;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
index fbbbc413f..1d1ce97bb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/iceberg/IcebergResourceOperator.java
@@ -18,16 +18,21 @@
 package org.apache.inlong.manager.service.resource.sink.iceberg;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergColumnInfo;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSinkDTO;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergTableInfo;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -48,10 +53,14 @@ public class IcebergResourceOperator implements 
SinkResourceOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergResourceOperator.class);
 
+    private static final String CATALOG_TYPE_HIVE = "HIVE";
+
     @Autowired
     private StreamSinkService sinkService;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
 
     @Override
     public Boolean accept(String sinkType) {
@@ -78,11 +87,27 @@ public class IcebergResourceOperator implements 
SinkResourceOperator {
         this.createTable(sinkInfo);
     }
 
+    private IcebergSinkDTO getIcebergInfo(SinkInfo sinkInfo) {
+        IcebergSinkDTO icebergInfo = 
IcebergSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        // read uri from data node if not supplied by user
+        if (StringUtils.isBlank(icebergInfo.getCatalogUri())
+                && CATALOG_TYPE_HIVE.equals(icebergInfo.getCatalogType())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "iceberg catalog uri not 
specified and data node is empty");
+            IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, icebergInfo);
+            icebergInfo.setCatalogUri(dataNodeInfo.getUrl());
+        }
+        return icebergInfo;
+    }
+
     private void createTable(SinkInfo sinkInfo) {
         LOGGER.info("begin to create iceberg table for sinkInfo={}", sinkInfo);
 
         // Get all info from config
-        IcebergSinkDTO icebergInfo = 
IcebergSinkDTO.getFromJson(sinkInfo.getExtParams());
+        IcebergSinkDTO icebergInfo = getIcebergInfo(sinkInfo);
         List<IcebergColumnInfo> columnInfoList = getColumnList(sinkInfo);
         if (CollectionUtils.isEmpty(columnInfoList)) {
             throw new IllegalArgumentException("no iceberg columns specified");
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index e4db70a34..db6cd23aa 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -57,6 +58,8 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
     protected StreamSinkEntityMapper sinkMapper;
     @Autowired
     protected StreamSinkFieldEntityMapper sinkFieldMapper;
+    @Autowired
+    protected DataNodeOperateHelper dataNodeHelper;
 
     /**
      * Setting the parameters of the latest entity.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 9442e7cb2..b5814ddff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.manager.service.sink.ck;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -82,6 +84,15 @@ public class ClickHouseSinkOperator extends 
AbstractSinkOperator {
         }
 
         ClickHouseSinkDTO dto = 
ClickHouseSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            Preconditions.checkNotEmpty(entity.getDataNodeName(),
+                    "clickhouse jdbc url unspecified and data node is empty");
+            ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            dto.setPassword(dataNodeInfo.getToken());
+        }
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
index 0c630c955..67b9e133a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperator.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.manager.service.sink.hive;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,15 @@ public class HiveSinkOperator extends AbstractSinkOperator {
         }
 
         HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            Preconditions.checkNotEmpty(entity.getDataNodeName(),
+                    "hive jdbc url unspecified and data node is empty");
+            HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            dto.setPassword(dataNodeInfo.getToken());
+        }
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
index f5fc4e8d4..e8da4f8a4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperator.java
@@ -18,10 +18,12 @@
 package org.apache.inlong.manager.service.sink.iceberg;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -48,6 +50,8 @@ public class IcebergSinkOperator extends AbstractSinkOperator 
{
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergSinkOperator.class);
 
+    private static final String CATALOG_TYPE_HIVE = "HIVE";
+
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -83,6 +87,15 @@ public class IcebergSinkOperator extends 
AbstractSinkOperator {
         }
 
         IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getCatalogUri()) && 
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
+            Preconditions.checkNotEmpty(entity.getDataNodeName(),
+                    "iceberg catalog uri unspecified and data node is empty");
+            IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setCatalogUri(dataNodeInfo.getUrl());
+        }
+
         CommonBeanUtils.copyProperties(entity, sink, true);
         CommonBeanUtils.copyProperties(dto, sink, true);
         List<SinkField> sinkFields = super.getSinkFields(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
index 428dd4f63..efd3311e8 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
@@ -48,7 +48,6 @@ public class DataNodeServiceTest extends ServiceBaseTest {
         request.setToken(password);
         request.setDescription("test cluster");
         request.setInCharges(GLOBAL_OPERATOR);
-        request.setJdbcUrl("127.0.0.1");
         request.setToken("123456");
         return dataNodeService.save(request, GLOBAL_OPERATOR);
     }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index 274bf9204..6197d4e81 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -38,6 +38,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
     private final String globalStreamId = "stream1";
     private final String globalOperator = "admin";
     private final String sinkName = "default";
+    private final String jdbcUrl = "127.0.0.1:8080";
 
     @Autowired
     private StreamSinkService sinkService;
@@ -56,6 +57,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setSinkType(SinkType.HIVE);
         
sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
         sinkInfo.setSinkName(sinkName);
+        sinkInfo.setJdbcUrl(jdbcUrl);
         return sinkService.save(sinkInfo, globalOperator);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
index fa3900163..59ed957be 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
@@ -56,6 +56,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
         sinkInfo.setSinkName(sinkName);
         sinkInfo.setId((int) (Math.random() * 100000 + 1));
+        sinkInfo.setCatalogUri("thrift://127.0.0.1:9000");
         return sinkService.save(sinkInfo, globalOperator);
     }
 

Reply via email to