This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e493b79d8 [INLONG-6785][Manager] Support register and manage the 
resource of Apache Hudi (#6790)
e493b79d8 is described below

commit e493b79d8a68d53f7fcf745b4394e14b25c65758
Author: averyzhang <[email protected]>
AuthorDate: Tue Dec 13 16:18:48 2022 +0800

    [INLONG-6785][Manager] Support register and manage the resource of Apache 
Hudi (#6790)
---
 .../inlong/manager/client/File2HudiExample.java    | 160 ++++++++++++++
 .../inlong/manager/common/consts/DataNodeType.java |   1 +
 .../inlong/manager/common/consts/SinkType.java     |   1 +
 .../plugin/flink/enums/ConnectorJarType.java       |   2 +
 .../plugin/listener/RestartStreamListener.java     |  10 +-
 .../manager/pojo/node/hudi/HudiDataNodeDTO.java    |  74 +++++++
 .../manager/pojo/node/hudi/HudiDataNodeInfo.java   |  54 +++++
 .../pojo/node/hudi/HudiDataNodeRequest.java        |  49 +++++
 .../manager/pojo/sink/hudi/HudiColumnInfo.java     |  78 +++++++
 .../manager/pojo/sink/hudi/HudiPartition.java}     |  40 +++-
 .../pojo/sink/hudi/HudiPartitionField.java}        |  27 ++-
 .../inlong/manager/pojo/sink/hudi/HudiSink.java    |  91 ++++++++
 .../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 132 +++++++++++
 .../manager/pojo/sink/hudi/HudiSinkRequest.java    |  70 ++++++
 .../manager/pojo/sink/hudi/HudiTableInfo.java}     |  24 +-
 .../inlong/manager/pojo/sink/hudi/HudiType.java    |  60 +++++
 .../manager/pojo/sort/util/LoadNodeUtils.java      |  38 ++++
 inlong-manager/manager-service/pom.xml             |   4 +
 .../service/node/hudi/HudiDataNodeOperator.java    |  86 ++++++++
 .../resource/sink/hudi/HudiCatalogClient.java      | 242 +++++++++++++++++++++
 .../resource/sink/hudi/HudiResourceOperator.java   | 177 +++++++++++++++
 .../service/sink/hudi/HudiSinkOperator.java        | 130 +++++++++++
 .../manager/service/sink/HudiSinkServiceTest.java  |  94 ++++++++
 .../sort/protocol/constant/HudiConstant.java       |  56 +++++
 .../sort/protocol/node/load/HudiLoadNode.java      |  33 ++-
 .../sort/protocol/node/load/HudiLoadNodeTest.java  |   2 +-
 .../inlong/sort/parser/HudiNodeSqlParserTest.java  |   2 +-
 27 files changed, 1695 insertions(+), 42 deletions(-)

diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
new file mode 100644
index 000000000..3ac40649b
--- /dev/null
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiPartition;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.source.file.FileSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.shiro.util.Assert;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for file to hudi.
+ */
+@Slf4j
+@Disabled
+public class File2HudiExample extends BaseExample {
+
+    @Test
+    public void testCreateGroupForHudi() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(super.getInlongAuth());
+        InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), 
configuration);
+
+        InlongGroupInfo groupInfo = super.createGroupInfo();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupInfo);
+            InlongStreamBuilder streamBuilder = 
group.createStream(createStreamInfo());
+            streamBuilder.fields(createStreamFields());
+            streamBuilder.source(createAgentFileSource());
+            streamBuilder.sink(createHudiSink());
+            streamBuilder.initOrUpdate();
+            // start group
+            InlongGroupContext inlongGroupContext = group.init();
+            Assert.notNull(inlongGroupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testStopGroup() {
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(10);
+        configuration.setReadTimeout(10);
+        configuration.setConnectTimeout(10);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(super.getInlongAuth());
+        InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), 
configuration);
+        InlongGroupInfo groupInfo = createGroupInfo();
+        try {
+            InlongGroup group = inlongClient.forGroup(groupInfo);
+            InlongGroupContext groupContext = group.delete();
+            Assert.notNull(groupContext);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private FileSource createAgentFileSource() {
+        FileSource fileSource = new FileSource();
+        fileSource.setSourceName("{source.name}");
+        fileSource.setAgentIp("{agent.ip}");
+        fileSource.setPattern("/a/b/*.txt");
+        fileSource.setTimeOffset("-1h");
+        return fileSource;
+    }
+
+    private List<StreamField> createStreamFields() {
+        List<StreamField> streamFieldList = Lists.newArrayList();
+        streamFieldList.add(new StreamField(0, FieldType.STRING.toString(), 
"name", null, null));
+        streamFieldList.add(new StreamField(1, FieldType.INT.toString(), 
"age", null, null));
+        streamFieldList.add(new StreamField(2, FieldType.DECIMAL.toString(), 
"score", null, null));
+        streamFieldList.add(new StreamField(3, FieldType.TIMESTAMP.toString(), 
"ts", null, null));
+        return streamFieldList;
+    }
+
+    /**
+     * Create Hudi sink
+     */
+    public HudiSink createHudiSink() {
+        HudiSink sink = new HudiSink();
+
+        sink.setSinkName("{sink.name}");
+        sink.setDbName("{db.name}");
+        sink.setTableName("{table.name}");
+        sink.setCatalogUri("thrift://{ip:port}");
+        sink.setWarehouse("hdfs://{ip:port}/user/hudi/warehouse/");
+
+        final SinkField field1 = new SinkField(0, FieldType.INT.toString(), 
"age", FieldType.INT.toString(), "age");
+        final SinkField field2 = new SinkField(1, FieldType.STRING.toString(), 
"name", FieldType.STRING.toString(),
+                "name");
+        final SinkField field3 = new SinkField(3, 
FieldType.DECIMAL.toString(), "score", FieldType.DECIMAL.toString(),
+                "score");
+        final SinkField field4 = new SinkField(3, 
FieldType.TIMESTAMP.toString(), "ts", FieldType.TIMESTAMP.toString(),
+                "ts");
+
+        // field ext param
+        // field1: bucket partition example
+        HudiColumnInfo info1 = new HudiColumnInfo();
+        info1.setRequired(true);
+        info1.setPartitionStrategy(HudiPartition.BUCKET.toString());
+        info1.setBucketNum(10);
+        field1.setExtParams(JsonUtils.toJsonString(info1));
+
+        // field3: decimal column example
+        HudiColumnInfo info3 = new HudiColumnInfo();
+        info3.setScale(5);
+        info3.setPrecision(10); // scale must be less than or equal to 
precision
+        field3.setExtParams(JsonUtils.toJsonString(info3));
+
+        // field4: hour partition example
+        HudiColumnInfo info4 = new HudiColumnInfo();
+        info4.setPartitionStrategy(HudiPartition.HOUR.toString());
+        field4.setExtParams(JsonUtils.toJsonString(info4));
+
+        List<SinkField> fields = new ArrayList<>();
+        fields.add(field1);
+        fields.add(field2);
+        fields.add(field3);
+        fields.add(field4);
+        sink.setSinkFieldList(fields);
+        return sink;
+    }
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 691a8db8e..16fa92c77 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -25,6 +25,7 @@ public class DataNodeType {
     public static final String HIVE = "HIVE";
     public static final String KAFKA = "KAFKA";
     public static final String ICEBERG = "ICEBERG";
+    public static final String HUDI = "HUDI";
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
     public static final String MYSQL = "MYSQL";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 28899c7de..a2da09886 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -25,6 +25,7 @@ public class SinkType {
     public static final String HIVE = "HIVE";
     public static final String KAFKA = "KAFKA";
     public static final String ICEBERG = "ICEBERG";
+    public static final String HUDI = "HUDI";
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String HBASE = "HBASE";
     public static final String POSTGRESQL = "POSTGRESQL";
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
index bd027c75c..de2259f67 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -69,6 +69,8 @@ public enum ConnectorJarType {
 
     ICEBERG_SINK("icebergLoad", "iceberg"),
 
+    HUDI_SINK("hudiLoad", "hudi"),
+
     HDFS_SINK("fileSystemLoad", ""),
 
     ;
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index f0469d188..dc56ab157 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.listener;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
@@ -89,9 +90,12 @@ public class RestartStreamListener implements 
SortOperateListener {
 
         Map<String, String> kvConf = new HashMap<>();
         groupExtList.forEach(groupExtInfo -> 
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
-        streamExtList.forEach(extInfo -> {
-            kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
-        });
+        // There is a possibility that the extList value is null
+        if (CollectionUtils.isNotEmpty(streamExtList)) {
+            streamExtList.forEach(extInfo -> {
+                kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+            });
+        }
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeDTO.java
new file mode 100644
index 000000000..53a71e81c
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeDTO.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Hudi data node info")
+public class HudiDataNodeDTO {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HudiDataNodeDTO.class);
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    @Builder.Default
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static HudiDataNodeDTO getFromRequest(HudiDataNodeRequest request) 
throws Exception {
+        return HudiDataNodeDTO.builder()
+                .catalogType(request.getCatalogType())
+                .warehouse(request.getWarehouse())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static HudiDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, HudiDataNodeDTO.class);
+        } catch (Exception e) {
+            LOGGER.error("Failed to extract additional parameters for Hudi 
data node: ", e);
+            throw new 
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeInfo.java
new file mode 100644
index 000000000..4b8152b3d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node info")
+public class HudiDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    public HudiDataNodeInfo() {
+        this.setType(DataNodeType.HUDI);
+    }
+
+    @Override
+    public HudiDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, HudiDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeRequest.java
new file mode 100644
index 000000000..e7962ef6d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hudi data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node request")
+public class HudiDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    public HudiDataNodeRequest() {
+        this.setType(DataNodeType.HUDI);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiColumnInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiColumnInfo.java
new file mode 100644
index 000000000..7af82b2d2
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiColumnInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+/**
+ * Hudi column info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HudiColumnInfo {
+
+    @ApiModelProperty("Length of fixed type")
+    private Integer length;
+
+    @ApiModelProperty("Precision of decimal type")
+    private Integer precision;
+
+    @ApiModelProperty("Scale of decimal type")
+    private Integer scale;
+
+    @ApiModelProperty("Field partition strategy, including: None, Identity, 
Year, Month, Day, Hour, Bucket, Truncate")
+    private String partitionStrategy;
+
+    @ApiModelProperty("Bucket num param of bucket partition")
+    private Integer bucketNum;
+
+    @ApiModelProperty("Width param of truncate partition")
+    private Integer width;
+
+    // The following are passed from base field and need not be part of API 
for extra param
+    private String name;
+    private String type;
+    private String desc;
+    private boolean required;
+
+    private boolean isPartition;
+
+    /**
+     * Get the extra param from the Json
+     */
+    public static HudiColumnInfo getFromJson(String extParams) {
+        if (StringUtils.isEmpty(extParams)) {
+            return new HudiColumnInfo();
+        }
+        try {
+            return JsonUtils.parseObject(extParams, HudiColumnInfo.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartition.java
similarity index 50%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartition.java
index 691a8db8e..cec8e0f48 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartition.java
@@ -15,18 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import org.apache.inlong.manager.common.util.Preconditions;
 
 /**
- * Constants of data node.
+ * Hudi partition type
  */
-public class DataNodeType {
+public enum HudiPartition {
+
+    IDENTITY,
+    BUCKET,
+    TRUNCATE,
+    YEAR,
+    MONTH,
+    DAY,
+    HOUR,
+    NONE,
+    ;
 
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
+    /**
+     * Get partition type from name
+     */
+    public static HudiPartition forName(String name) {
+        Preconditions.checkNotNull(name, "HudiPartition should not be null");
+        for (HudiPartition value : values()) {
+            if (value.toString().equalsIgnoreCase(name)) {
+                return value;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Unsupported 
HudiPartition : %s", name));
+    }
 
+    @Override
+    public String toString() {
+        return name();
+    }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartitionField.java
similarity index 56%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartitionField.java
index 691a8db8e..c4e354c41 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartitionField.java
@@ -15,18 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
 /**
- * Constants of data node.
+ * Hudi partition field info
  */
-public class DataNodeType {
+@Data
+@ApiModel("Hudi partition field")
+public class HudiPartitionField {
+
+    @ApiModelProperty("Field name")
+    private String fieldName;
+
+    @ApiModelProperty("Field type")
+    private String fieldType;
 
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
+    @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS, 
SECONDS, SQL, ISO_8601"
+            + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used 
for time format")
+    private String fieldFormat;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
new file mode 100644
index 000000000..54e332f88
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashMap;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Hudi sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Hudi sink info")
+@JsonTypeDefine(value = SinkType.HUDI)
+public class HudiSink extends StreamSink {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    @Builder.Default
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+    private String catalogUri;
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Data path, such as: 
hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("File format, support: Parquet, Orc, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month, 
O-once, R-regulation")
+    private String partitionType;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Extended properties")
+    private List<HashMap<String, String>> extList;
+
+    @ApiModelProperty("Partition field list")
+    private List<HudiPartitionField> partitionFieldList;
+
+    public HudiSink() {
+        this.setSinkType(SinkType.HUDI);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, HudiSinkRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
new file mode 100644
index 000000000..616fc56f7
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+/**
+ * Hudi sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HudiSinkDTO {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    @Builder.Default
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+    private String catalogUri;
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Data path, such as: 
hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("File format, support: Parquet, Orc, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month, 
O-once, R-regulation")
+    private String partitionType;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Properties for Hudi")
+    private Map<String, Object> properties;
+
+    @ApiModelProperty("Extended properties")
+    private List<HashMap<String, String>> extList;
+
+    @ApiModelProperty("Partition field list")
+    private List<HudiPartitionField> partitionFieldList;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static HudiSinkDTO getFromRequest(HudiSinkRequest request) {
+        return HudiSinkDTO.builder()
+                .catalogUri(request.getCatalogUri())
+                .warehouse(request.getWarehouse())
+                .dbName(request.getDbName())
+                .tableName(request.getTableName())
+                .dataPath(request.getDataPath())
+                .partitionFieldList(request.getPartitionFieldList())
+                .fileFormat(request.getFileFormat())
+                .catalogType(request.getCatalogType())
+                .properties(request.getProperties())
+                .extList(request.getExtList())
+                .primaryKey(request.getPrimaryKey())
+                .build();
+    }
+
+    public static HudiSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, HudiSinkDTO.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Get Hudi table info
+     */
+    public static HudiTableInfo getHudiTableInfo(HudiSinkDTO hudiInfo, 
List<HudiColumnInfo> columnList) {
+        HudiTableInfo tableInfo = new HudiTableInfo();
+        tableInfo.setDbName(hudiInfo.getDbName());
+        tableInfo.setTableName(hudiInfo.getTableName());
+
+        // Set partition fields
+        if (CollectionUtils.isNotEmpty(hudiInfo.getPartitionFieldList())) {
+            for (HudiPartitionField field : hudiInfo.getPartitionFieldList()) {
+                HudiColumnInfo columnInfo = new HudiColumnInfo();
+                columnInfo.setName(field.getFieldName());
+                columnInfo.setPartition(true);
+                columnInfo.setType("string");
+                columnList.add(columnInfo);
+            }
+        }
+        tableInfo.setColumns(columnList);
+        tableInfo.setPrimaryKey(hudiInfo.getPrimaryKey());
+        tableInfo.setFileFormat(hudiInfo.getFileFormat());
+        tableInfo.setTblProperties(hudiInfo.getProperties());
+        return tableInfo;
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
new file mode 100644
index 000000000..a51d0a153
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashMap;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Hudi sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Hudi sink request")
+@JsonTypeDefine(value = SinkType.HUDI)
+public class HudiSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+    private String catalogType = "HIVE";
+
+    @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+    private String catalogUri;
+
+    @ApiModelProperty("Hudi data warehouse dir")
+    private String warehouse;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Data path, such as: 
hdfs://ip:port/user/hive/warehouse/test.db")
+    private String dataPath;
+
+    @ApiModelProperty("File format, support: Parquet, Orc, Avro")
+    private String fileFormat;
+
+    @ApiModelProperty("Extended properties")
+    private List<HashMap<String, String>> extList;
+
+    @ApiModelProperty("Partition field list")
+    private List<HudiPartitionField> partitionFieldList;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+}
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
similarity index 65%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
index 691a8db8e..24e4d7260 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
@@ -15,18 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
 
 /**
- * Constants of data node.
+ * Hudi table info
  */
-public class DataNodeType {
+@Data
+public class HudiTableInfo {
 
-    public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String MYSQL = "MYSQL";
+    private String dbName;
+    private String tableName;
+    private String tableDesc;
+    private String fileFormat;
+    private Map<String, Object> tblProperties;
+    private List<HudiColumnInfo> columns;
 
+    private String primaryKey;
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
new file mode 100644
index 000000000..672b07f8f
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import lombok.Getter;
+
+/**
+ * Hudi data type
+ */
+public enum HudiType {
+
+    BOOLEAN("boolean"),
+    INT("int"),
+    LONG("long"),
+    FLOAT("float"),
+    DOUBLE("double"),
+    DECIMAL("decimal"),
+    DATE("date"),
+    TIME("time"),
+    TIMESTAMP("timestamp"),
+    TIMESTAMPTZ("timestamptz"),
+    STRING("string"),
+    UUID("uuid"),
+    FIXED("fixed"),
+    BINARY("binary");
+
+    @Getter
+    private final String type;
+
+    HudiType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * Get type from name
+     */
+    public static HudiType forType(String type) {
+        for (HudiType ibType : values()) {
+            if (ibType.getType().equalsIgnoreCase(type)) {
+                return ibType;
+            }
+        }
+        throw new IllegalArgumentException(String.format("invalid hudi type = 
%s", type));
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 37cf239bc..9bbed161f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
 import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
 import org.apache.inlong.manager.pojo.sink.hive.HivePartitionField;
 import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
 import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
@@ -46,6 +47,7 @@ import 
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.sort.formats.common.StringTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HudiConstant;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
@@ -62,6 +64,7 @@ import 
org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -122,6 +125,8 @@ public class LoadNodeUtils {
                 return createLoadNode((ClickHouseSink) streamSink, fieldInfos, 
fieldRelations, properties);
             case SinkType.ICEBERG:
                 return createLoadNode((IcebergSink) streamSink, fieldInfos, 
fieldRelations, properties);
+            case SinkType.HUDI:
+                return createLoadNode((HudiSink) streamSink, fieldInfos, 
fieldRelations, properties);
             case SinkType.SQLSERVER:
                 return createLoadNode((SQLServerSink) streamSink, fieldInfos, 
fieldRelations, properties);
             case SinkType.ELASTICSEARCH:
@@ -398,6 +403,39 @@ public class LoadNodeUtils {
                 icebergSink.getWarehouse());
     }
 
+    /**
+     * Create load node of Hudi.
+     */
+    public static HudiLoadNode createLoadNode(HudiSink hudiSink, 
List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> 
properties) {
+        HudiConstant.CatalogType catalogType = 
HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
+        List<FieldInfo> partitionFields = Lists.newArrayList();
+        if (CollectionUtils.isNotEmpty(hudiSink.getPartitionFieldList())) {
+            partitionFields = hudiSink.getPartitionFieldList().stream()
+                    .map(partitionField -> new 
FieldInfo(partitionField.getFieldName(), hudiSink.getSinkName(),
+                            
FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+                                    partitionField.getFieldFormat())))
+                    .collect(Collectors.toList());
+        }
+        return new HudiLoadNode(
+                hudiSink.getSinkName(),
+                hudiSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                hudiSink.getDbName(),
+                hudiSink.getTableName(),
+                hudiSink.getPrimaryKey(),
+                catalogType,
+                hudiSink.getCatalogUri(),
+                hudiSink.getWarehouse(),
+                hudiSink.getExtList(),
+                partitionFields);
+    }
+
     /**
      * Create load node of SQLServer.
      */
diff --git a/inlong-manager/manager-service/pom.xml 
b/inlong-manager/manager-service/pom.xml
index 8b8d6a33f..72d8d9cbc 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -557,5 +557,9 @@
             <artifactId>flink-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-flink1.13-bundle</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hudi/HudiDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hudi/HudiDataNodeOperator.java
new file mode 100644
index 000000000..79b253cd9
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hudi/HudiDataNodeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.hudi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HudiDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HudiDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.HUDI;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        HudiDataNodeInfo hudiDataNodeInfo = new HudiDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, hudiDataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            HudiDataNodeDTO dto = 
HudiDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, hudiDataNodeInfo);
+        }
+
+        LOGGER.debug("success to get Hudi data node from entity");
+        return hudiDataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        HudiDataNodeRequest hudiDataNodeRequest = (HudiDataNodeRequest) 
request;
+        CommonBeanUtils.copyProperties(hudiDataNodeRequest, targetEntity, 
true);
+        try {
+            HudiDataNodeDTO dto = 
HudiDataNodeDTO.getFromRequest(hudiDataNodeRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            LOGGER.error("failed to set entity for Hudi data node: ", e);
+            throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
new file mode 100644
index 000000000..124dcaedb
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.hudi;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import 
org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hudi.sync.common.util.ConfigUtils;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiTableInfo;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Catalog client for Hudi.
+ */
+public class HudiCatalogClient {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HudiCatalogClient.class);
+
+    private final String uri;
+    private final String dbName;
+    private final String warehouse;
+    private IMetaStoreClient client;
+    private final HiveConf hiveConf;
+
+    public HudiCatalogClient(String uri, String warehouse, String dbName) 
throws MetaException {
+        this.uri = uri;
+        this.warehouse = warehouse;
+        this.dbName = dbName;
+        hiveConf = new HiveConf();
+        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uri);
+        hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, 
false);
+    }
+
+    /**
+     * Open the hive metastore connection
+     */
+    public void open() {
+        if (this.client == null) {
+            try {
+                this.client = Hive.get(hiveConf).getMSC();
+            } catch (Exception e) {
+                throw new HoodieCatalogException("Failed to create hive 
metastore client", e);
+            }
+            LOG.info("Connected to Hive metastore");
+        }
+    }
+
+    private void createDatabase(String warehouse, Map<String, String> meta, 
boolean ignoreIfExists) {
+        Database database = new Database();
+        Map<String, String> parameter = Maps.newHashMap();
+        database.setName(dbName);
+        database.setLocationUri((new Path(warehouse, dbName) + ".db"));
+        meta.forEach((key, value) -> {
+            if (key.equals("comment")) {
+                database.setDescription(value);
+            } else if (key.equals("location")) {
+                database.setLocationUri(value);
+            } else if (value != null) {
+                parameter.put(key, value);
+            }
+            database.setParameters(parameter);
+        });
+        try {
+            client.createDatabase(database);
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new RuntimeException("Database '" + dbName + "' already 
exist!");
+            }
+        } catch (TException e) {
+            throw new RuntimeException("Failed to create database '" + dbName
+                    + "'", e);
+        }
+    }
+
+    /**
+     * Create the hudi database
+     * @param warehouse the warehouse directory in dfs
+     * @param ignoreIfExists not create again if exist 
+     */
+    public void createDatabase(String warehouse, boolean ignoreIfExists) {
+        createDatabase(warehouse, Maps.newHashMap(), ignoreIfExists);
+    }
+
+    /**
+     * Check table if exist
+     * @param tableName the table name of hudi table
+     * @return return true if exist
+     */
+    public boolean tableExist(String tableName) throws TException {
+        return client.tableExists(dbName, tableName);
+    }
+
+    /**
+     * get column infos of exist hudi table
+     * @param dbName the database name
+     * @param tableName the table name
+     */
+    public List<HudiColumnInfo> getColumns(
+            String dbName,
+            String tableName)
+            throws TException {
+        Table hiveTable = client.getTable(dbName, tableName);
+        List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
+                // filter out the metadata columns
+                .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
+                .collect(Collectors.toList());
+        allCols.addAll(hiveTable.getPartitionKeys());
+
+        return allCols.stream()
+                .map((FieldSchema s) -> {
+                    HudiColumnInfo info = new HudiColumnInfo();
+                    info.setName(s.getName());
+                    info.setType(s.getType());
+                    return info;
+                })
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Add column to hudi table at the tail
+     */
+    public void addColumns(String tableName, List<HudiColumnInfo> columns) 
throws TException {
+        Table hiveTable = client.getTable(dbName, tableName);
+        Table newHiveTable = hiveTable.deepCopy();
+        List<FieldSchema> cols = newHiveTable.getSd().getCols();
+        for (HudiColumnInfo column : columns) {
+            FieldSchema fieldSchema = new FieldSchema();
+            fieldSchema.setName(column.getName());
+            fieldSchema.setType(column.getType());
+            fieldSchema.setComment(column.getDesc());
+            cols.add(fieldSchema);
+        }
+        newHiveTable.getSd().setCols(cols);
+        client.alter_table(dbName, tableName, newHiveTable);
+    }
+
+    /**
+     * Create hudi table and register to hive metastore
+     * @param tableName the hudi table name
+     * @param tableInfo the hudi table info
+     * @param useRealTimeInputFormat ignore uber input Format 
+     */
+    public void createTable(
+            String tableName,
+            HudiTableInfo tableInfo,
+            boolean useRealTimeInputFormat)
+            throws TException, IOException {
+        Table hiveTable = 
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, tableName);
+        
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
+        hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+        Map<String, String> properties = new HashMap<>();
+        String location = this.warehouse + "/" + dbName + ".db" + "/" + 
tableName;
+        properties.put("path", location);
+
+        List<FieldSchema> cols = new ArrayList<>();
+        for (HudiColumnInfo column : tableInfo.getColumns()) {
+            FieldSchema fieldSchema = new FieldSchema();
+            fieldSchema.setName(column.getName());
+            fieldSchema.setType(column.getType());
+            fieldSchema.setComment(column.getDesc());
+            cols.add(fieldSchema);
+        }
+
+        // Build storage of hudi table
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(cols);
+        hiveTable.setDbName(dbName);
+        hiveTable.setTableName(tableName);
+        // FIXME: splitSchemas need config by frontend
+
+        HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+        // ignore uber input Format
+        String inputFormatClassName =
+                HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, 
useRealTimeInputFormat);
+        String outputFormatClassName = 
HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
+        String serDeClassName = 
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+        sd.setInputFormat(inputFormatClassName);
+        sd.setOutputFormat(outputFormatClassName);
+
+        Map<String, String> serdeProperties = new HashMap<>();
+        serdeProperties.put("path", location);
+        serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, 
String.valueOf(!useRealTimeInputFormat));
+        sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
+        sd.setLocation(location);
+        hiveTable.setSd(sd);
+
+        hiveTable.setParameters(properties);
+
+        client.createTable(hiveTable);
+    }
+
+    /**
+     * Close the connection of hive metastore
+     */
+    public void close() {
+        if (client != null) {
+            client.close();
+            client = null;
+            LOG.info("Disconnect to hive metastore");
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiResourceOperator.java
new file mode 100644
index 000000000..17e0ed879
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiResourceOperator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.hudi;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkDTO;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Hudi resource operator
+ */
+@Service
+public class HudiResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HudiResourceOperator.class);
+
+    private static final String CATALOG_TYPE_HIVE = "HIVE";
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldMapper;
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.HUDI.equals(sinkType);
+    }
+
+    /**
+     * Create Hudi table according to the sink config
+     */
+    public void createSinkResource(SinkInfo sinkInfo) {
+        if (sinkInfo == null) {
+            LOGGER.warn("sink info was null, skip to create resource");
+            return;
+        }
+
+        if 
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already 
success, skip to create");
+            return;
+        } else if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
 {
+            LOGGER.warn("create resource was disabled, skip to create for [" + 
sinkInfo.getId() + "]");
+            return;
+        }
+
+        this.createTableIfAbsent(sinkInfo);
+    }
+
+    private HudiSinkDTO getHudiInfo(SinkInfo sinkInfo) {
+        HudiSinkDTO hudiInfo = 
HudiSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        // read uri from data node if not supplied by user
+        if (StringUtils.isBlank(hudiInfo.getCatalogUri())
+                && CATALOG_TYPE_HIVE.equals(hudiInfo.getCatalogType())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.checkNotEmpty(dataNodeName, "Hudi catalog uri not 
specified and data node is empty");
+            HudiDataNodeInfo dataNodeInfo = (HudiDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, hudiInfo);
+            hudiInfo.setCatalogUri(dataNodeInfo.getUrl());
+        }
+
+        hudiInfo.setDataPath(hudiInfo.getWarehouse() + "/" + 
hudiInfo.getDbName() + ".db/" + hudiInfo.getTableName());
+        return hudiInfo;
+    }
+
+    private void createTableIfAbsent(SinkInfo sinkInfo) {
+        LOGGER.info("begin to create hudi table for sinkInfo={}", sinkInfo);
+
+        // Get all info from config
+        HudiSinkDTO hudiInfo = getHudiInfo(sinkInfo);
+        List<HudiColumnInfo> columnInfoList = getColumnList(sinkInfo);
+        if (CollectionUtils.isEmpty(columnInfoList)) {
+            throw new IllegalArgumentException("no hudi columns specified");
+        }
+        HudiTableInfo tableInfo = HudiSinkDTO.getHudiTableInfo(hudiInfo, 
columnInfoList);
+
+        String metastoreUri = hudiInfo.getCatalogUri();
+        String warehouse = hudiInfo.getWarehouse();
+        String dbName = hudiInfo.getDbName();
+        String tableName = hudiInfo.getTableName();
+
+        HudiCatalogClient client = null;
+        try {
+            client = new HudiCatalogClient(metastoreUri, warehouse, dbName);
+            client.open();
+
+            // 1. create database if not exists
+            client.createDatabase(warehouse, true);
+            // 2. check if the table exists
+            boolean tableExists = client.tableExist(tableName);
+
+            if (!tableExists) {
+                // 3. create table
+                client.createTable(tableName, tableInfo, true);
+            } else {
+                // 4. or update table columns
+                List<HudiColumnInfo> existColumns = client.getColumns(dbName, 
tableName);
+                List<HudiColumnInfo> needAddColumns = 
tableInfo.getColumns().stream().skip(existColumns.size())
+                        .collect(toList());
+                if (CollectionUtils.isNotEmpty(needAddColumns)) {
+                    client.addColumns(tableName, needAddColumns);
+                    LOGGER.info("{} columns added for table {}", 
needAddColumns.size(), tableName);
+                }
+            }
+            String info = "success to create Hudi resource";
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOGGER.info(info + " for sinkInfo = {}", info);
+        } catch (Throwable e) {
+            String errMsg = "create Hudi table failed: " + e.getMessage();
+            LOGGER.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+    private List<HudiColumnInfo> getColumnList(SinkInfo sinkInfo) {
+        List<StreamSinkFieldEntity> fieldList = 
sinkFieldMapper.selectBySinkId(sinkInfo.getId());
+
+        // set columns
+        List<HudiColumnInfo> columnList = new ArrayList<>();
+        for (StreamSinkFieldEntity field : fieldList) {
+            HudiColumnInfo column = 
HudiColumnInfo.getFromJson(field.getExtParams());
+            column.setName(field.getFieldName());
+            column.setType(field.getFieldType());
+            column.setDesc(field.getFieldComment());
+            column.setRequired(field.getIsRequired() != null && 
field.getIsRequired() > 0);
+            columnList.add(column);
+        }
+
+        return columnList;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
new file mode 100644
index 000000000..953137cc2
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.hudi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkDTO;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Hudi sink operator, such as save or update hudi field, etc.
+ */
+@Service
+public class HudiSinkOperator extends AbstractSinkOperator {
+
+    private static final String HOODIE_PRIMARY_KEY_FIELD = 
"hoodie.datasource.write.recordkey.field";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HudiSinkOperator.class);
+
+    private static final String CATALOG_TYPE_HIVE = "HIVE";
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.HUDI.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.HUDI;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+        
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
+                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
+        List<HashMap<String, String>> extList = sinkRequest.getExtList();
+
+        try {
+            HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            LOGGER.error("parsing json string to sink info failed", e);
+            throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        HudiSink sink = new HudiSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        HudiSinkDTO dto = HudiSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getCatalogUri()) && 
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
+            Preconditions.checkNotEmpty(entity.getDataNodeName(),
+                    "hudi catalog uri unspecified and data node is empty");
+            HudiDataNodeInfo dataNodeInfo = (HudiDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setCatalogUri(dataNodeInfo.getUrl());
+        }
+
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+    @Override
+    protected void checkFieldInfo(SinkField field) {
+        if (FieldType.forName(field.getFieldType()) == FieldType.DECIMAL) {
+            HudiColumnInfo info = 
HudiColumnInfo.getFromJson(field.getExtParams());
+            if (info.getPrecision() == null || info.getScale() == null) {
+                String errorMsg = String.format("precision or scale not 
specified for decimal field (%s)",
+                        field.getFieldName());
+                LOGGER.error("field info check error: {}", errorMsg);
+                throw new BusinessException(errorMsg);
+            }
+            if (info.getPrecision() < info.getScale()) {
+                String errorMsg = String.format(
+                        "precision (%d) must be greater or equal than scale 
(%d) for decimal field (%s)",
+                        info.getPrecision(), info.getScale(), 
field.getFieldName());
+                LOGGER.error("field info check error: {}", errorMsg);
+                throw new BusinessException(errorMsg);
+            }
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HudiSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HudiSinkServiceTest.java
new file mode 100644
index 000000000..59041537d
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HudiSinkServiceTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Hudi stream sink service test.
+ */
+public class HudiSinkServiceTest extends ServiceBaseTest {
+
+    private final String globalGroupId = "b_group1";
+    private final String globalStreamId = "stream1_hudi";
+    private final String globalOperator = "admin";
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save sink info.
+     */
+    public Integer saveSink(String sinkName) {
+        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, 
globalOperator);
+        HudiSinkRequest sinkInfo = new HudiSinkRequest();
+        sinkInfo.setInlongGroupId(globalGroupId);
+        sinkInfo.setInlongStreamId(globalStreamId);
+        sinkInfo.setSinkType(SinkType.HUDI);
+        
sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+        sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
+        sinkInfo.setSinkName(sinkName);
+        sinkInfo.setId((int) (Math.random() * 100000 + 1));
+        sinkInfo.setCatalogUri("thrift://127.0.0.1:9000");
+        return sinkService.save(sinkInfo, globalOperator);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveSink("default1");
+        Assertions.assertNotNull(id);
+        boolean result = sinkService.delete(id, false, globalOperator);
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer id = this.saveSink("default2");
+        StreamSink sink = sinkService.get(id);
+        Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
+        sinkService.delete(id, false, globalOperator);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer sinkId = this.saveSink("default3");
+        StreamSink streamSink = sinkService.get(sinkId);
+        Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+        HudiSink sink = (HudiSink) streamSink;
+        sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+        SinkRequest request = sink.genSinkRequest();
+        boolean result = sinkService.update(request, globalOperator);
+        Assertions.assertTrue(result);
+
+        sinkService.delete(sinkId, false, globalOperator);
+    }
+
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HudiConstant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HudiConstant.java
new file mode 100644
index 000000000..098038a22
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HudiConstant.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * Hudi option constant
+ */
+public class HudiConstant {
+
+    /**
+     * Hudi supported catalog type
+     */
+    public enum CatalogType {
+
+        /**
+         * Data stored in hive metastore.
+         */
+        HIVE,
+        /**
+         * Data stored in hadoop filesystem.
+         */
+        HADOOP,
+        /**
+         * Data stored in hybris metastore.
+         */
+        HYBRIS;
+
+        /**
+         * get catalogType from name
+         */
+        public static CatalogType forName(String name) {
+            for (CatalogType value : values()) {
+                if (value.name().equals(name)) {
+                    return value;
+                }
+            }
+            throw new IllegalArgumentException(String.format("Unsupport 
catalogType:%s", name));
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index 86306dea0..dc4669ac0 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -35,12 +35,15 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.InlongMetric;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
+/**
+ * The load node of hudi.
+ */
 @JsonTypeName("hudiLoad")
 @Data
 @NoArgsConstructor
@@ -49,6 +52,8 @@ public class HudiLoadNode extends LoadNode implements 
InlongMetric, Serializable
 
     private static final long serialVersionUID = -1L;
 
+    public static final String ENABLE_CODE = "true";
+
     private static final String HUDI_OPTION_HIVE_SYNC_ENABLED = 
"hive_sync.enabled";
     private static final String HUDI_OPTION_HIVE_SYNC_DB = "hive_sync.db";
     private static final String HUDI_OPTION_HIVE_SYNC_TABLE = 
"hive_sync.table";
@@ -59,9 +64,11 @@ public class HudiLoadNode extends LoadNode implements 
InlongMetric, Serializable
     private static final String HUDI_OPTION_DEFAULT_PATH = "path";
     private static final String HUDI_OPTION_DATABASE_NAME = 
"hoodie.database.name";
     private static final String HUDI_OPTION_TABLE_NAME = "hoodie.table.name";
-    private static final String HUDI_OPTION_RECORDKEY_FIELD_NAME = 
"hoodie.datasource.write.recordkey.field";
-    private static final String HUDI_OPTION_PARTITIONPATH_FIELD_NAME = 
"hoodie.datasource.write.partitionpath.field";
-    private static final String DDL_ATTRIBUTE_HUDI = "hudi.";
+    private static final String HUDI_OPTION_RECORD_KEY_FIELD_NAME = 
"hoodie.datasource.write.recordkey.field";
+    private static final String HUDI_OPTION_PARTITION_PATH_FIELD_NAME = 
"hoodie.datasource.write.partitionpath.field";
+    private static final String DDL_ATTR_PREFIX = "ddl.";
+    private static final String EXTEND_ATTR_KEY_NAME = "keyName";
+    private static final String EXTEND_ATTR_VALUE_NAME = "keyValue";
 
     @JsonProperty("tableName")
     @Nonnull
@@ -122,7 +129,9 @@ public class HudiLoadNode extends LoadNode implements 
InlongMetric, Serializable
     public Map<String, String> tableOptions() {
         Map<String, String> options = super.tableOptions();
 
-        options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, "true");
+        // Synchronization to Metastore is enabled by default,
+        // which can be modified in the front-end configuration
+        options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, ENABLE_CODE);
         options.put(HUDI_OPTION_HIVE_SYNC_MODE, 
HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE);
         options.put(HUDI_OPTION_HIVE_SYNC_DB, dbName);
         options.put(HUDI_OPTION_HIVE_SYNC_TABLE, tableName);
@@ -134,14 +143,18 @@ public class HudiLoadNode extends LoadNode implements 
InlongMetric, Serializable
                     partitionFields.stream()
                             .map(FieldInfo::getName)
                             .collect(Collectors.joining(","));
-            options.put(HUDI_OPTION_PARTITIONPATH_FIELD_NAME, partitionKey);
+            options.put(HUDI_OPTION_PARTITION_PATH_FIELD_NAME, partitionKey);
         }
 
+        // If the extend attributes starts with .ddl,
+        // it will be passed to the ddl statement of the table
         extList.forEach(ext -> {
-            String keyName = ext.get("keyName");
+            String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
             if (StringUtils.isNoneBlank(keyName) &&
-                    keyName.startsWith(DDL_ATTRIBUTE_HUDI)) {
-                options.put(keyName, ext.get("keyValue"));
+                    keyName.startsWith(DDL_ATTR_PREFIX)) {
+                String ddlKeyName = 
keyName.substring(DDL_ATTR_PREFIX.length());
+                String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+                options.put(ddlKeyName, ddlValue);
             }
         });
 
@@ -150,7 +163,7 @@ public class HudiLoadNode extends LoadNode implements 
InlongMetric, Serializable
 
         options.put(HUDI_OPTION_DATABASE_NAME, dbName);
         options.put(HUDI_OPTION_TABLE_NAME, tableName);
-        options.put(HUDI_OPTION_RECORDKEY_FIELD_NAME, primaryKey);
+        options.put(HUDI_OPTION_RECORD_KEY_FIELD_NAME, primaryKey);
         options.put("connector", "hudi-inlong");
 
         return options;
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
index dabfd54ac..ecdf88f0e 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 /**
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 0cb9802d1..32908d557 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -40,7 +40,7 @@ import 
org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;


Reply via email to