This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e493b79d8 [INLONG-6785][Manager] Support register and manage the
resource of Apache Hudi (#6790)
e493b79d8 is described below
commit e493b79d8a68d53f7fcf745b4394e14b25c65758
Author: averyzhang <[email protected]>
AuthorDate: Tue Dec 13 16:18:48 2022 +0800
[INLONG-6785][Manager] Support register and manage the resource of Apache
Hudi (#6790)
---
.../inlong/manager/client/File2HudiExample.java | 160 ++++++++++++++
.../inlong/manager/common/consts/DataNodeType.java | 1 +
.../inlong/manager/common/consts/SinkType.java | 1 +
.../plugin/flink/enums/ConnectorJarType.java | 2 +
.../plugin/listener/RestartStreamListener.java | 10 +-
.../manager/pojo/node/hudi/HudiDataNodeDTO.java | 74 +++++++
.../manager/pojo/node/hudi/HudiDataNodeInfo.java | 54 +++++
.../pojo/node/hudi/HudiDataNodeRequest.java | 49 +++++
.../manager/pojo/sink/hudi/HudiColumnInfo.java | 78 +++++++
.../manager/pojo/sink/hudi/HudiPartition.java} | 40 +++-
.../pojo/sink/hudi/HudiPartitionField.java} | 27 ++-
.../inlong/manager/pojo/sink/hudi/HudiSink.java | 91 ++++++++
.../inlong/manager/pojo/sink/hudi/HudiSinkDTO.java | 132 +++++++++++
.../manager/pojo/sink/hudi/HudiSinkRequest.java | 70 ++++++
.../manager/pojo/sink/hudi/HudiTableInfo.java} | 24 +-
.../inlong/manager/pojo/sink/hudi/HudiType.java | 60 +++++
.../manager/pojo/sort/util/LoadNodeUtils.java | 38 ++++
inlong-manager/manager-service/pom.xml | 4 +
.../service/node/hudi/HudiDataNodeOperator.java | 86 ++++++++
.../resource/sink/hudi/HudiCatalogClient.java | 242 +++++++++++++++++++++
.../resource/sink/hudi/HudiResourceOperator.java | 177 +++++++++++++++
.../service/sink/hudi/HudiSinkOperator.java | 130 +++++++++++
.../manager/service/sink/HudiSinkServiceTest.java | 94 ++++++++
.../sort/protocol/constant/HudiConstant.java | 56 +++++
.../sort/protocol/node/load/HudiLoadNode.java | 33 ++-
.../sort/protocol/node/load/HudiLoadNodeTest.java | 2 +-
.../inlong/sort/parser/HudiNodeSqlParserTest.java | 2 +-
27 files changed, 1695 insertions(+), 42 deletions(-)
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
new file mode 100644
index 000000000..3ac40649b
--- /dev/null
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiPartition;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.source.file.FileSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.shiro.util.Assert;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test class for file to hudi.
+ */
+@Slf4j
+@Disabled
+public class File2HudiExample extends BaseExample {
+
+ @Test
+ public void testCreateGroupForHudi() {
+ ClientConfiguration configuration = new ClientConfiguration();
+ configuration.setWriteTimeout(10);
+ configuration.setReadTimeout(10);
+ configuration.setConnectTimeout(10);
+ configuration.setTimeUnit(TimeUnit.SECONDS);
+ configuration.setAuthentication(super.getInlongAuth());
+ InlongClient inlongClient = InlongClient.create(super.getServiceUrl(),
configuration);
+
+ InlongGroupInfo groupInfo = super.createGroupInfo();
+ try {
+ InlongGroup group = inlongClient.forGroup(groupInfo);
+ InlongStreamBuilder streamBuilder =
group.createStream(createStreamInfo());
+ streamBuilder.fields(createStreamFields());
+ streamBuilder.source(createAgentFileSource());
+ streamBuilder.sink(createHudiSink());
+ streamBuilder.initOrUpdate();
+ // start group
+ InlongGroupContext inlongGroupContext = group.init();
+ Assert.notNull(inlongGroupContext);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testStopGroup() {
+ ClientConfiguration configuration = new ClientConfiguration();
+ configuration.setWriteTimeout(10);
+ configuration.setReadTimeout(10);
+ configuration.setConnectTimeout(10);
+ configuration.setTimeUnit(TimeUnit.SECONDS);
+ configuration.setAuthentication(super.getInlongAuth());
+ InlongClient inlongClient = InlongClient.create(super.getServiceUrl(),
configuration);
+ InlongGroupInfo groupInfo = createGroupInfo();
+ try {
+ InlongGroup group = inlongClient.forGroup(groupInfo);
+ InlongGroupContext groupContext = group.delete();
+ Assert.notNull(groupContext);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private FileSource createAgentFileSource() {
+ FileSource fileSource = new FileSource();
+ fileSource.setSourceName("{source.name}");
+ fileSource.setAgentIp("{agent.ip}");
+ fileSource.setPattern("/a/b/*.txt");
+ fileSource.setTimeOffset("-1h");
+ return fileSource;
+ }
+
+ private List<StreamField> createStreamFields() {
+ List<StreamField> streamFieldList = Lists.newArrayList();
+ streamFieldList.add(new StreamField(0, FieldType.STRING.toString(),
"name", null, null));
+ streamFieldList.add(new StreamField(1, FieldType.INT.toString(),
"age", null, null));
+ streamFieldList.add(new StreamField(2, FieldType.DECIMAL.toString(),
"score", null, null));
+ streamFieldList.add(new StreamField(3, FieldType.TIMESTAMP.toString(),
"ts", null, null));
+ return streamFieldList;
+ }
+
+ /**
+ * Create Hudi sink
+ */
+ public HudiSink createHudiSink() {
+ HudiSink sink = new HudiSink();
+
+ sink.setSinkName("{sink.name}");
+ sink.setDbName("{db.name}");
+ sink.setTableName("{table.name}");
+ sink.setCatalogUri("thrift://{ip:port}");
+ sink.setWarehouse("hdfs://{ip:port}/user/hudi/warehouse/");
+
+ final SinkField field1 = new SinkField(0, FieldType.INT.toString(),
"age", FieldType.INT.toString(), "age");
+ final SinkField field2 = new SinkField(1, FieldType.STRING.toString(),
"name", FieldType.STRING.toString(),
+ "name");
+ final SinkField field3 = new SinkField(3,
FieldType.DECIMAL.toString(), "score", FieldType.DECIMAL.toString(),
+ "score");
+ final SinkField field4 = new SinkField(3,
FieldType.TIMESTAMP.toString(), "ts", FieldType.TIMESTAMP.toString(),
+ "ts");
+
+ // field ext param
+ // field1: bucket partition example
+ HudiColumnInfo info1 = new HudiColumnInfo();
+ info1.setRequired(true);
+ info1.setPartitionStrategy(HudiPartition.BUCKET.toString());
+ info1.setBucketNum(10);
+ field1.setExtParams(JsonUtils.toJsonString(info1));
+
+ // field3: decimal column example
+ HudiColumnInfo info3 = new HudiColumnInfo();
+ info3.setScale(5);
+ info3.setPrecision(10); // scale must be less than or equal to
precision
+ field3.setExtParams(JsonUtils.toJsonString(info3));
+
+ // field4: hour partition example
+ HudiColumnInfo info4 = new HudiColumnInfo();
+ info4.setPartitionStrategy(HudiPartition.HOUR.toString());
+ field4.setExtParams(JsonUtils.toJsonString(info4));
+
+ List<SinkField> fields = new ArrayList<>();
+ fields.add(field1);
+ fields.add(field2);
+ fields.add(field3);
+ fields.add(field4);
+ sink.setSinkFieldList(fields);
+ return sink;
+ }
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 691a8db8e..16fa92c77 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -25,6 +25,7 @@ public class DataNodeType {
public static final String HIVE = "HIVE";
public static final String KAFKA = "KAFKA";
public static final String ICEBERG = "ICEBERG";
+ public static final String HUDI = "HUDI";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String MYSQL = "MYSQL";
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 28899c7de..a2da09886 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -25,6 +25,7 @@ public class SinkType {
public static final String HIVE = "HIVE";
public static final String KAFKA = "KAFKA";
public static final String ICEBERG = "ICEBERG";
+ public static final String HUDI = "HUDI";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String HBASE = "HBASE";
public static final String POSTGRESQL = "POSTGRESQL";
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
index bd027c75c..de2259f67 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -69,6 +69,8 @@ public enum ConnectorJarType {
ICEBERG_SINK("icebergLoad", "iceberg"),
+ HUDI_SINK("hudiLoad", "hudi"),
+
HDFS_SINK("fileSystemLoad", ""),
;
diff --git
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index f0469d188..dc56ab157 100644
---
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.plugin.listener;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
@@ -89,9 +90,12 @@ public class RestartStreamListener implements
SortOperateListener {
Map<String, String> kvConf = new HashMap<>();
groupExtList.forEach(groupExtInfo ->
kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
- streamExtList.forEach(extInfo -> {
- kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
- });
+ // There is a possibility that the extList value is null
+ if (CollectionUtils.isNotEmpty(streamExtList)) {
+ streamExtList.forEach(extInfo -> {
+ kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
+ });
+ }
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeDTO.java
new file mode 100644
index 000000000..53a71e81c
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeDTO.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Hudi data node info")
+public class HudiDataNodeDTO {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HudiDataNodeDTO.class);
+
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ @Builder.Default
+ private String catalogType = "HIVE";
+
+ @ApiModelProperty("Hudi data warehouse dir")
+ private String warehouse;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static HudiDataNodeDTO getFromRequest(HudiDataNodeRequest request)
throws Exception {
+ return HudiDataNodeDTO.builder()
+ .catalogType(request.getCatalogType())
+ .warehouse(request.getWarehouse())
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static HudiDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, HudiDataNodeDTO.class);
+ } catch (Exception e) {
+ LOGGER.error("Failed to extract additional parameters for Hudi
data node: ", e);
+ throw new
BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeInfo.java
new file mode 100644
index 000000000..4b8152b3d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hudi data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node info")
+public class HudiDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
+
+ @ApiModelProperty("Hudi data warehouse dir")
+ private String warehouse;
+
+ public HudiDataNodeInfo() {
+ this.setType(DataNodeType.HUDI);
+ }
+
+ @Override
+ public HudiDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, HudiDataNodeRequest::new);
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeRequest.java
new file mode 100644
index 000000000..e7962ef6d
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hudi/HudiDataNodeRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hudi data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HUDI)
+@ApiModel("Hudi data node request")
+public class HudiDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
+
+ @ApiModelProperty("Hudi data warehouse dir")
+ private String warehouse;
+
+ public HudiDataNodeRequest() {
+ this.setType(DataNodeType.HUDI);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiColumnInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiColumnInfo.java
new file mode 100644
index 000000000..7af82b2d2
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiColumnInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+/**
+ * Hudi column info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HudiColumnInfo {
+
+ @ApiModelProperty("Length of fixed type")
+ private Integer length;
+
+ @ApiModelProperty("Precision of decimal type")
+ private Integer precision;
+
+ @ApiModelProperty("Scale of decimal type")
+ private Integer scale;
+
+ @ApiModelProperty("Field partition strategy, including: None, Identity,
Year, Month, Day, Hour, Bucket, Truncate")
+ private String partitionStrategy;
+
+ @ApiModelProperty("Bucket num param of bucket partition")
+ private Integer bucketNum;
+
+ @ApiModelProperty("Width param of truncate partition")
+ private Integer width;
+
+ // The following are passed from base field and need not be part of API
for extra param
+ private String name;
+ private String type;
+ private String desc;
+ private boolean required;
+
+ private boolean isPartition;
+
+ /**
+ * Get the extra param from the Json
+ */
+ public static HudiColumnInfo getFromJson(String extParams) {
+ if (StringUtils.isEmpty(extParams)) {
+ return new HudiColumnInfo();
+ }
+ try {
+ return JsonUtils.parseObject(extParams, HudiColumnInfo.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartition.java
similarity index 50%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartition.java
index 691a8db8e..cec8e0f48 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartition.java
@@ -15,18 +15,40 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import org.apache.inlong.manager.common.util.Preconditions;
/**
- * Constants of data node.
+ * Hudi partition type
*/
-public class DataNodeType {
+public enum HudiPartition {
+
+ IDENTITY,
+ BUCKET,
+ TRUNCATE,
+ YEAR,
+ MONTH,
+ DAY,
+ HOUR,
+ NONE,
+ ;
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
+ /**
+ * Get partition type from name
+ */
+ public static HudiPartition forName(String name) {
+ Preconditions.checkNotNull(name, "HudiPartition should not be null");
+ for (HudiPartition value : values()) {
+ if (value.toString().equalsIgnoreCase(name)) {
+ return value;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupported
HudiPartition : %s", name));
+ }
+ @Override
+ public String toString() {
+ return name();
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartitionField.java
similarity index 56%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartitionField.java
index 691a8db8e..c4e354c41 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiPartitionField.java
@@ -15,18 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
/**
- * Constants of data node.
+ * Hudi partition field info
*/
-public class DataNodeType {
+@Data
+@ApiModel("Hudi partition field")
+public class HudiPartitionField {
+
+ @ApiModelProperty("Field name")
+ private String fieldName;
+
+ @ApiModelProperty("Field type")
+ private String fieldType;
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
+ @ApiModelProperty("Field format, including: MICROSECONDS, MILLISECONDS,
SECONDS, SQL, ISO_8601"
+ + " and custom such as 'yyyy-MM-dd HH:mm:ss'. This is mainly used
for time format")
+ private String fieldFormat;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
new file mode 100644
index 000000000..54e332f88
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSink.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashMap;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+/**
+ * Hudi sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Hudi sink info")
+@JsonTypeDefine(value = SinkType.HUDI)
+public class HudiSink extends StreamSink {
+
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ @Builder.Default
+ private String catalogType = "HIVE";
+
+ @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+ private String catalogUri;
+
+ @ApiModelProperty("Hudi data warehouse dir")
+ private String warehouse;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Data path, such as:
hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
+
+ @ApiModelProperty("File format, support: Parquet, Orc, Avro")
+ private String fileFormat;
+
+ @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month,
O-once, R-regulation")
+ private String partitionType;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Extended properties")
+ private List<HashMap<String, String>> extList;
+
+ @ApiModelProperty("Partition field list")
+ private List<HudiPartitionField> partitionFieldList;
+
+ public HudiSink() {
+ this.setSinkType(SinkType.HUDI);
+ }
+
+ @Override
+ public SinkRequest genSinkRequest() {
+ return CommonBeanUtils.copyProperties(this, HudiSinkRequest::new);
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
new file mode 100644
index 000000000..616fc56f7
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkDTO.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+/**
+ * Hudi sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class HudiSinkDTO {
+
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ @Builder.Default
+ private String catalogType = "HIVE";
+
+ @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+ private String catalogUri;
+
+ @ApiModelProperty("Hudi data warehouse dir")
+ private String warehouse;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Data path, such as:
hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
+
+ @ApiModelProperty("File format, support: Parquet, Orc, Avro")
+ private String fileFormat;
+
+ @ApiModelProperty("Partition type, like: H-hour, D-day, W-week, M-month,
O-once, R-regulation")
+ private String partitionType;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Properties for Hudi")
+ private Map<String, Object> properties;
+
+ @ApiModelProperty("Extended properties")
+ private List<HashMap<String, String>> extList;
+
+ @ApiModelProperty("Partition field list")
+ private List<HudiPartitionField> partitionFieldList;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static HudiSinkDTO getFromRequest(HudiSinkRequest request) {
+ return HudiSinkDTO.builder()
+ .catalogUri(request.getCatalogUri())
+ .warehouse(request.getWarehouse())
+ .dbName(request.getDbName())
+ .tableName(request.getTableName())
+ .dataPath(request.getDataPath())
+ .partitionFieldList(request.getPartitionFieldList())
+ .fileFormat(request.getFileFormat())
+ .catalogType(request.getCatalogType())
+ .properties(request.getProperties())
+ .extList(request.getExtList())
+ .primaryKey(request.getPrimaryKey())
+ .build();
+ }
+
+ public static HudiSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ return JsonUtils.parseObject(extParams, HudiSinkDTO.class);
+ } catch (Exception e) {
+ throw new
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " +
e.getMessage());
+ }
+ }
+
+ /**
+ * Get Hudi table info
+ */
+ public static HudiTableInfo getHudiTableInfo(HudiSinkDTO hudiInfo,
List<HudiColumnInfo> columnList) {
+ HudiTableInfo tableInfo = new HudiTableInfo();
+ tableInfo.setDbName(hudiInfo.getDbName());
+ tableInfo.setTableName(hudiInfo.getTableName());
+
+ // Set partition fields
+ if (CollectionUtils.isNotEmpty(hudiInfo.getPartitionFieldList())) {
+ for (HudiPartitionField field : hudiInfo.getPartitionFieldList()) {
+ HudiColumnInfo columnInfo = new HudiColumnInfo();
+ columnInfo.setName(field.getFieldName());
+ columnInfo.setPartition(true);
+ columnInfo.setType("string");
+ columnList.add(columnInfo);
+ }
+ }
+ tableInfo.setColumns(columnList);
+ tableInfo.setPrimaryKey(hudiInfo.getPrimaryKey());
+ tableInfo.setFileFormat(hudiInfo.getFileFormat());
+ tableInfo.setTblProperties(hudiInfo.getProperties());
+ return tableInfo;
+ }
+
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
new file mode 100644
index 000000000..a51d0a153
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiSinkRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.HashMap;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+/**
+ * Hudi sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Hudi sink request")
+@JsonTypeDefine(value = SinkType.HUDI)
+public class HudiSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("Catalog type, like: HIVE, HADOOP, default is HIVE")
+ private String catalogType = "HIVE";
+
+ @ApiModelProperty("Catalog uri, such as hive metastore thrift://ip:port")
+ private String catalogUri;
+
+ @ApiModelProperty("Hudi data warehouse dir")
+ private String warehouse;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Data path, such as:
hdfs://ip:port/user/hive/warehouse/test.db")
+ private String dataPath;
+
+ @ApiModelProperty("File format, support: Parquet, Orc, Avro")
+ private String fileFormat;
+
+ @ApiModelProperty("Extended properties")
+ private List<HashMap<String, String>> extList;
+
+ @ApiModelProperty("Partition field list")
+ private List<HudiPartitionField> partitionFieldList;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
similarity index 65%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
index 691a8db8e..24e4d7260 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiTableInfo.java
@@ -15,18 +15,24 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.consts;
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Data;
/**
- * Constants of data node.
+ * Hudi table info
*/
-public class DataNodeType {
+@Data
+public class HudiTableInfo {
- public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String MYSQL = "MYSQL";
+ private String dbName;
+ private String tableName;
+ private String tableDesc;
+ private String fileFormat;
+ private Map<String, Object> tblProperties;
+ private List<HudiColumnInfo> columns;
+ private String primaryKey;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
new file mode 100644
index 000000000..672b07f8f
--- /dev/null
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.hudi;
+
+import lombok.Getter;
+
+/**
+ * Hudi data type
+ */
+public enum HudiType {
+
+ BOOLEAN("boolean"),
+ INT("int"),
+ LONG("long"),
+ FLOAT("float"),
+ DOUBLE("double"),
+ DECIMAL("decimal"),
+ DATE("date"),
+ TIME("time"),
+ TIMESTAMP("timestamp"),
+ TIMESTAMPTZ("timestamptz"),
+ STRING("string"),
+ UUID("uuid"),
+ FIXED("fixed"),
+ BINARY("binary");
+
+ @Getter
+ private final String type;
+
+ HudiType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * Get type from name
+ */
+ public static HudiType forType(String type) {
+ for (HudiType ibType : values()) {
+ if (ibType.getType().equalsIgnoreCase(type)) {
+ return ibType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("invalid hudi type =
%s", type));
+ }
+}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 37cf239bc..9bbed161f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
import org.apache.inlong.manager.pojo.sink.hive.HivePartitionField;
import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
@@ -46,6 +47,7 @@ import
org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.sort.formats.common.StringTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HudiConstant;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
@@ -62,6 +64,7 @@ import
org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -122,6 +125,8 @@ public class LoadNodeUtils {
return createLoadNode((ClickHouseSink) streamSink, fieldInfos,
fieldRelations, properties);
case SinkType.ICEBERG:
return createLoadNode((IcebergSink) streamSink, fieldInfos,
fieldRelations, properties);
+ case SinkType.HUDI:
+ return createLoadNode((HudiSink) streamSink, fieldInfos,
fieldRelations, properties);
case SinkType.SQLSERVER:
return createLoadNode((SQLServerSink) streamSink, fieldInfos,
fieldRelations, properties);
case SinkType.ELASTICSEARCH:
@@ -398,6 +403,39 @@ public class LoadNodeUtils {
icebergSink.getWarehouse());
}
+ /**
+ * Create load node of Hudi.
+ */
+ public static HudiLoadNode createLoadNode(HudiSink hudiSink,
List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String>
properties) {
+ HudiConstant.CatalogType catalogType =
HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
+ List<FieldInfo> partitionFields = Lists.newArrayList();
+ if (CollectionUtils.isNotEmpty(hudiSink.getPartitionFieldList())) {
+ partitionFields = hudiSink.getPartitionFieldList().stream()
+ .map(partitionField -> new
FieldInfo(partitionField.getFieldName(), hudiSink.getSinkName(),
+
FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+ partitionField.getFieldFormat())))
+ .collect(Collectors.toList());
+ }
+ return new HudiLoadNode(
+ hudiSink.getSinkName(),
+ hudiSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ hudiSink.getDbName(),
+ hudiSink.getTableName(),
+ hudiSink.getPrimaryKey(),
+ catalogType,
+ hudiSink.getCatalogUri(),
+ hudiSink.getWarehouse(),
+ hudiSink.getExtList(),
+ partitionFields);
+ }
+
/**
* Create load node of SQLServer.
*/
diff --git a/inlong-manager/manager-service/pom.xml
b/inlong-manager/manager-service/pom.xml
index 8b8d6a33f..72d8d9cbc 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -557,5 +557,9 @@
<artifactId>flink-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-flink1.13-bundle</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hudi/HudiDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hudi/HudiDataNodeOperator.java
new file mode 100644
index 000000000..79b253cd9
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hudi/HudiDataNodeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.hudi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HudiDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HudiDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.HUDI;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ HudiDataNodeInfo hudiDataNodeInfo = new HudiDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, hudiDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ HudiDataNodeDTO dto =
HudiDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, hudiDataNodeInfo);
+ }
+
+ LOGGER.debug("success to get Hudi data node from entity");
+ return hudiDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity
targetEntity) {
+ HudiDataNodeRequest hudiDataNodeRequest = (HudiDataNodeRequest)
request;
+ CommonBeanUtils.copyProperties(hudiDataNodeRequest, targetEntity,
true);
+ try {
+ HudiDataNodeDTO dto =
HudiDataNodeDTO.getFromRequest(hudiDataNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("failed to set entity for Hudi data node: ", e);
+ throw new
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
new file mode 100644
index 000000000..124dcaedb
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.hudi;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import
org.apache.hudi.org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hudi.sync.common.util.ConfigUtils;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiTableInfo;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Catalog client for Hudi.
+ */
+public class HudiCatalogClient {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HudiCatalogClient.class);
+
+ private final String uri;
+ private final String dbName;
+ private final String warehouse;
+ private IMetaStoreClient client;
+ private final HiveConf hiveConf;
+
+ public HudiCatalogClient(String uri, String warehouse, String dbName)
throws MetaException {
+ this.uri = uri;
+ this.warehouse = warehouse;
+ this.dbName = dbName;
+ hiveConf = new HiveConf();
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, uri);
+ hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI,
false);
+ }
+
+ /**
+ * Open the hive metastore connection
+ */
+ public void open() {
+ if (this.client == null) {
+ try {
+ this.client = Hive.get(hiveConf).getMSC();
+ } catch (Exception e) {
+ throw new HoodieCatalogException("Failed to create hive
metastore client", e);
+ }
+ LOG.info("Connected to Hive metastore");
+ }
+ }
+
+ private void createDatabase(String warehouse, Map<String, String> meta,
boolean ignoreIfExists) {
+ Database database = new Database();
+ Map<String, String> parameter = Maps.newHashMap();
+ database.setName(dbName);
+ database.setLocationUri((new Path(warehouse, dbName) + ".db"));
+ meta.forEach((key, value) -> {
+ if (key.equals("comment")) {
+ database.setDescription(value);
+ } else if (key.equals("location")) {
+ database.setLocationUri(value);
+ } else if (value != null) {
+ parameter.put(key, value);
+ }
+ database.setParameters(parameter);
+ });
+ try {
+ client.createDatabase(database);
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new RuntimeException("Database '" + dbName + "' already
exist!");
+ }
+ } catch (TException e) {
+ throw new RuntimeException("Failed to create database '" + dbName
+ + "'", e);
+ }
+ }
+
+ /**
+ * Create the hudi database
+ * @param warehouse the warehouse directory in dfs
+ * @param ignoreIfExists not create again if exist
+ */
+ public void createDatabase(String warehouse, boolean ignoreIfExists) {
+ createDatabase(warehouse, Maps.newHashMap(), ignoreIfExists);
+ }
+
+ /**
+ * Check table if exist
+ * @param tableName the table name of hudi table
+ * @return return true if exist
+ */
+ public boolean tableExist(String tableName) throws TException {
+ return client.tableExists(dbName, tableName);
+ }
+
+ /**
+ * get column infos of exist hudi table
+ * @param dbName the database name
+ * @param tableName the table name
+ */
+ public List<HudiColumnInfo> getColumns(
+ String dbName,
+ String tableName)
+ throws TException {
+ Table hiveTable = client.getTable(dbName, tableName);
+ List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
+ // filter out the metadata columns
+ .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
+ .collect(Collectors.toList());
+ allCols.addAll(hiveTable.getPartitionKeys());
+
+ return allCols.stream()
+ .map((FieldSchema s) -> {
+ HudiColumnInfo info = new HudiColumnInfo();
+ info.setName(s.getName());
+ info.setType(s.getType());
+ return info;
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Add column to hudi table at the tail
+ */
+ public void addColumns(String tableName, List<HudiColumnInfo> columns)
throws TException {
+ Table hiveTable = client.getTable(dbName, tableName);
+ Table newHiveTable = hiveTable.deepCopy();
+ List<FieldSchema> cols = newHiveTable.getSd().getCols();
+ for (HudiColumnInfo column : columns) {
+ FieldSchema fieldSchema = new FieldSchema();
+ fieldSchema.setName(column.getName());
+ fieldSchema.setType(column.getType());
+ fieldSchema.setComment(column.getDesc());
+ cols.add(fieldSchema);
+ }
+ newHiveTable.getSd().setCols(cols);
+ client.alter_table(dbName, tableName, newHiveTable);
+ }
+
+ /**
+ * Create hudi table and register to hive metastore
+ * @param tableName the hudi table name
+ * @param tableInfo the hudi table info
+ * @param useRealTimeInputFormat ignore uber input Format
+ */
+ public void createTable(
+ String tableName,
+ HudiTableInfo tableInfo,
+ boolean useRealTimeInputFormat)
+ throws TException, IOException {
+ Table hiveTable =
org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable(dbName, tableName);
+
hiveTable.setOwner(UserGroupInformation.getCurrentUser().getUserName());
+ hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
+
+ Map<String, String> properties = new HashMap<>();
+ String location = this.warehouse + "/" + dbName + ".db" + "/" +
tableName;
+ properties.put("path", location);
+
+ List<FieldSchema> cols = new ArrayList<>();
+ for (HudiColumnInfo column : tableInfo.getColumns()) {
+ FieldSchema fieldSchema = new FieldSchema();
+ fieldSchema.setName(column.getName());
+ fieldSchema.setType(column.getType());
+ fieldSchema.setComment(column.getDesc());
+ cols.add(fieldSchema);
+ }
+
+ // Build storage of hudi table
+ StorageDescriptor sd = new StorageDescriptor();
+ sd.setCols(cols);
+ hiveTable.setDbName(dbName);
+ hiveTable.setTableName(tableName);
+ // FIXME: splitSchemas need config by frontend
+
+ HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
+ // ignore uber input Format
+ String inputFormatClassName =
+ HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat,
useRealTimeInputFormat);
+ String outputFormatClassName =
HoodieInputFormatUtils.getOutputFormatClassName(baseFileFormat);
+ String serDeClassName =
HoodieInputFormatUtils.getSerDeClassName(baseFileFormat);
+ sd.setInputFormat(inputFormatClassName);
+ sd.setOutputFormat(outputFormatClassName);
+
+ Map<String, String> serdeProperties = new HashMap<>();
+ serdeProperties.put("path", location);
+ serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE,
String.valueOf(!useRealTimeInputFormat));
+ sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
+ sd.setLocation(location);
+ hiveTable.setSd(sd);
+
+ hiveTable.setParameters(properties);
+
+ client.createTable(hiveTable);
+ }
+
+ /**
+ * Close the connection of hive metastore
+ */
+ public void close() {
+ if (client != null) {
+ client.close();
+ client = null;
+ LOG.info("Disconnect to hive metastore");
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiResourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiResourceOperator.java
new file mode 100644
index 000000000..17e0ed879
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiResourceOperator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.hudi;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkDTO;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Hudi resource operator
+ */
+@Service
+public class HudiResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HudiResourceOperator.class);
+
+ private static final String CATALOG_TYPE_HIVE = "HIVE";
+
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private StreamSinkFieldEntityMapper sinkFieldMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.HUDI.equals(sinkType);
+ }
+
+ /**
+ * Create Hudi table according to the sink config
+ */
+ public void createSinkResource(SinkInfo sinkInfo) {
+ if (sinkInfo == null) {
+ LOGGER.warn("sink info was null, skip to create resource");
+ return;
+ }
+
+ if
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOGGER.warn("sink resource [" + sinkInfo.getId() + "] already
success, skip to create");
+ return;
+ } else if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
{
+ LOGGER.warn("create resource was disabled, skip to create for [" +
sinkInfo.getId() + "]");
+ return;
+ }
+
+ this.createTableIfAbsent(sinkInfo);
+ }
+
+ private HudiSinkDTO getHudiInfo(SinkInfo sinkInfo) {
+ HudiSinkDTO hudiInfo =
HudiSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+ // read uri from data node if not supplied by user
+ if (StringUtils.isBlank(hudiInfo.getCatalogUri())
+ && CATALOG_TYPE_HIVE.equals(hudiInfo.getCatalogType())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "Hudi catalog uri not
specified and data node is empty");
+ HudiDataNodeInfo dataNodeInfo = (HudiDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, hudiInfo);
+ hudiInfo.setCatalogUri(dataNodeInfo.getUrl());
+ }
+
+ hudiInfo.setDataPath(hudiInfo.getWarehouse() + "/" +
hudiInfo.getDbName() + ".db/" + hudiInfo.getTableName());
+ return hudiInfo;
+ }
+
+ private void createTableIfAbsent(SinkInfo sinkInfo) {
+ LOGGER.info("begin to create hudi table for sinkInfo={}", sinkInfo);
+
+ // Get all info from config
+ HudiSinkDTO hudiInfo = getHudiInfo(sinkInfo);
+ List<HudiColumnInfo> columnInfoList = getColumnList(sinkInfo);
+ if (CollectionUtils.isEmpty(columnInfoList)) {
+ throw new IllegalArgumentException("no hudi columns specified");
+ }
+ HudiTableInfo tableInfo = HudiSinkDTO.getHudiTableInfo(hudiInfo,
columnInfoList);
+
+ String metastoreUri = hudiInfo.getCatalogUri();
+ String warehouse = hudiInfo.getWarehouse();
+ String dbName = hudiInfo.getDbName();
+ String tableName = hudiInfo.getTableName();
+
+ HudiCatalogClient client = null;
+ try {
+ client = new HudiCatalogClient(metastoreUri, warehouse, dbName);
+ client.open();
+
+ // 1. create database if not exists
+ client.createDatabase(warehouse, true);
+ // 2. check if the table exists
+ boolean tableExists = client.tableExist(tableName);
+
+ if (!tableExists) {
+ // 3. create table
+ client.createTable(tableName, tableInfo, true);
+ } else {
+ // 4. or update table columns
+ List<HudiColumnInfo> existColumns = client.getColumns(dbName,
tableName);
+ List<HudiColumnInfo> needAddColumns =
tableInfo.getColumns().stream().skip(existColumns.size())
+ .collect(toList());
+ if (CollectionUtils.isNotEmpty(needAddColumns)) {
+ client.addColumns(tableName, needAddColumns);
+ LOGGER.info("{} columns added for table {}",
needAddColumns.size(), tableName);
+ }
+ }
+ String info = "success to create Hudi resource";
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOGGER.info(info + " for sinkInfo = {}", info);
+ } catch (Throwable e) {
+ String errMsg = "create Hudi table failed: " + e.getMessage();
+ LOGGER.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(),
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+ private List<HudiColumnInfo> getColumnList(SinkInfo sinkInfo) {
+ List<StreamSinkFieldEntity> fieldList =
sinkFieldMapper.selectBySinkId(sinkInfo.getId());
+
+ // set columns
+ List<HudiColumnInfo> columnList = new ArrayList<>();
+ for (StreamSinkFieldEntity field : fieldList) {
+ HudiColumnInfo column =
HudiColumnInfo.getFromJson(field.getExtParams());
+ column.setName(field.getFieldName());
+ column.setType(field.getFieldType());
+ column.setDesc(field.getFieldComment());
+ column.setRequired(field.getIsRequired() != null &&
field.getIsRequired() > 0);
+ columnList.add(column);
+ }
+
+ return columnList;
+ }
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
new file mode 100644
index 000000000..953137cc2
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.hudi;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.hudi.HudiDataNodeInfo;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkDTO;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Hudi sink operator, such as save or update hudi field, etc.
+ */
+@Service
+public class HudiSinkOperator extends AbstractSinkOperator {
+
+ private static final String HOODIE_PRIMARY_KEY_FIELD =
"hoodie.datasource.write.recordkey.field";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HudiSinkOperator.class);
+
+ private static final String CATALOG_TYPE_HIVE = "HIVE";
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.HUDI.equals(sinkType);
+ }
+
+ @Override
+ protected String getSinkType() {
+ return SinkType.HUDI;
+ }
+
+ @Override
+ protected void setTargetEntity(SinkRequest request, StreamSinkEntity
targetEntity) {
+
Preconditions.checkTrue(this.getSinkType().equals(request.getSinkType()),
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " +
getSinkType());
+ HudiSinkRequest sinkRequest = (HudiSinkRequest) request;
+ List<HashMap<String, String>> extList = sinkRequest.getExtList();
+
+ try {
+ HudiSinkDTO dto = HudiSinkDTO.getFromRequest(sinkRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ LOGGER.error("parsing json string to sink info failed", e);
+ throw new
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+ }
+ }
+
+ @Override
+ public StreamSink getFromEntity(StreamSinkEntity entity) {
+ HudiSink sink = new HudiSink();
+ if (entity == null) {
+ return sink;
+ }
+
+ HudiSinkDTO dto = HudiSinkDTO.getFromJson(entity.getExtParams());
+ if (StringUtils.isBlank(dto.getCatalogUri()) &&
CATALOG_TYPE_HIVE.equals(dto.getCatalogType())) {
+ Preconditions.checkNotEmpty(entity.getDataNodeName(),
+ "hudi catalog uri unspecified and data node is empty");
+ HudiDataNodeInfo dataNodeInfo = (HudiDataNodeInfo)
dataNodeHelper.getDataNodeInfo(
+ entity.getDataNodeName(), entity.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+ dto.setCatalogUri(dataNodeInfo.getUrl());
+ }
+
+ CommonBeanUtils.copyProperties(entity, sink, true);
+ CommonBeanUtils.copyProperties(dto, sink, true);
+ List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+ sink.setSinkFieldList(sinkFields);
+ return sink;
+ }
+
+ @Override
+ protected void checkFieldInfo(SinkField field) {
+ if (FieldType.forName(field.getFieldType()) == FieldType.DECIMAL) {
+ HudiColumnInfo info =
HudiColumnInfo.getFromJson(field.getExtParams());
+ if (info.getPrecision() == null || info.getScale() == null) {
+ String errorMsg = String.format("precision or scale not
specified for decimal field (%s)",
+ field.getFieldName());
+ LOGGER.error("field info check error: {}", errorMsg);
+ throw new BusinessException(errorMsg);
+ }
+ if (info.getPrecision() < info.getScale()) {
+ String errorMsg = String.format(
+ "precision (%d) must be greater or equal than scale
(%d) for decimal field (%s)",
+ info.getPrecision(), info.getScale(),
field.getFieldName());
+ LOGGER.error("field info check error: {}", errorMsg);
+ throw new BusinessException(errorMsg);
+ }
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HudiSinkServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HudiSinkServiceTest.java
new file mode 100644
index 000000000..59041537d
--- /dev/null
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HudiSinkServiceTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSinkRequest;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Hudi stream sink service test.
+ */
+public class HudiSinkServiceTest extends ServiceBaseTest {
+
+ private final String globalGroupId = "b_group1";
+ private final String globalStreamId = "stream1_hudi";
+ private final String globalOperator = "admin";
+
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+
+ /**
+ * Save sink info.
+ */
+ public Integer saveSink(String sinkName) {
+ streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
globalOperator);
+ HudiSinkRequest sinkInfo = new HudiSinkRequest();
+ sinkInfo.setInlongGroupId(globalGroupId);
+ sinkInfo.setInlongStreamId(globalStreamId);
+ sinkInfo.setSinkType(SinkType.HUDI);
+
sinkInfo.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+ sinkInfo.setDataPath("hdfs://127.0.0.1:8020/data");
+ sinkInfo.setSinkName(sinkName);
+ sinkInfo.setId((int) (Math.random() * 100000 + 1));
+ sinkInfo.setCatalogUri("thrift://127.0.0.1:9000");
+ return sinkService.save(sinkInfo, globalOperator);
+ }
+
+ @Test
+ public void testSaveAndDelete() {
+ Integer id = this.saveSink("default1");
+ Assertions.assertNotNull(id);
+ boolean result = sinkService.delete(id, false, globalOperator);
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer id = this.saveSink("default2");
+ StreamSink sink = sinkService.get(id);
+ Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
+ sinkService.delete(id, false, globalOperator);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer sinkId = this.saveSink("default3");
+ StreamSink streamSink = sinkService.get(sinkId);
+ Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId());
+
+ HudiSink sink = (HudiSink) streamSink;
+ sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE);
+ SinkRequest request = sink.genSinkRequest();
+ boolean result = sinkService.update(request, globalOperator);
+ Assertions.assertTrue(result);
+
+ sinkService.delete(sinkId, false, globalOperator);
+ }
+
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HudiConstant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HudiConstant.java
new file mode 100644
index 000000000..098038a22
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/HudiConstant.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.constant;
+
+/**
+ * Hudi option constant
+ */
+public class HudiConstant {
+
+ /**
+ * Hudi supported catalog type
+ */
+ public enum CatalogType {
+
+ /**
+ * Data stored in hive metastore.
+ */
+ HIVE,
+ /**
+ * Data stored in hadoop filesystem.
+ */
+ HADOOP,
+ /**
+ * Data stored in hybris metastore.
+ */
+ HYBRIS;
+
+ /**
+ * get catalogType from name
+ */
+ public static CatalogType forName(String name) {
+ for (CatalogType value : values()) {
+ if (value.name().equals(name)) {
+ return value;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport
catalogType:%s", name));
+ }
+ }
+}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index 86306dea0..dc4669ac0 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -35,12 +35,15 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.InlongMetric;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+/**
+ * The load node of hudi.
+ */
@JsonTypeName("hudiLoad")
@Data
@NoArgsConstructor
@@ -49,6 +52,8 @@ public class HudiLoadNode extends LoadNode implements
InlongMetric, Serializable
private static final long serialVersionUID = -1L;
+ public static final String ENABLE_CODE = "true";
+
private static final String HUDI_OPTION_HIVE_SYNC_ENABLED =
"hive_sync.enabled";
private static final String HUDI_OPTION_HIVE_SYNC_DB = "hive_sync.db";
private static final String HUDI_OPTION_HIVE_SYNC_TABLE =
"hive_sync.table";
@@ -59,9 +64,11 @@ public class HudiLoadNode extends LoadNode implements
InlongMetric, Serializable
private static final String HUDI_OPTION_DEFAULT_PATH = "path";
private static final String HUDI_OPTION_DATABASE_NAME =
"hoodie.database.name";
private static final String HUDI_OPTION_TABLE_NAME = "hoodie.table.name";
- private static final String HUDI_OPTION_RECORDKEY_FIELD_NAME =
"hoodie.datasource.write.recordkey.field";
- private static final String HUDI_OPTION_PARTITIONPATH_FIELD_NAME =
"hoodie.datasource.write.partitionpath.field";
- private static final String DDL_ATTRIBUTE_HUDI = "hudi.";
+ private static final String HUDI_OPTION_RECORD_KEY_FIELD_NAME =
"hoodie.datasource.write.recordkey.field";
+ private static final String HUDI_OPTION_PARTITION_PATH_FIELD_NAME =
"hoodie.datasource.write.partitionpath.field";
+ private static final String DDL_ATTR_PREFIX = "ddl.";
+ private static final String EXTEND_ATTR_KEY_NAME = "keyName";
+ private static final String EXTEND_ATTR_VALUE_NAME = "keyValue";
@JsonProperty("tableName")
@Nonnull
@@ -122,7 +129,9 @@ public class HudiLoadNode extends LoadNode implements
InlongMetric, Serializable
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, "true");
+ // Synchronization to Metastore is enabled by default,
+ // which can be modified in the front-end configuration
+ options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, ENABLE_CODE);
options.put(HUDI_OPTION_HIVE_SYNC_MODE,
HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE);
options.put(HUDI_OPTION_HIVE_SYNC_DB, dbName);
options.put(HUDI_OPTION_HIVE_SYNC_TABLE, tableName);
@@ -134,14 +143,18 @@ public class HudiLoadNode extends LoadNode implements
InlongMetric, Serializable
partitionFields.stream()
.map(FieldInfo::getName)
.collect(Collectors.joining(","));
- options.put(HUDI_OPTION_PARTITIONPATH_FIELD_NAME, partitionKey);
+ options.put(HUDI_OPTION_PARTITION_PATH_FIELD_NAME, partitionKey);
}
+ // If the extend attributes starts with .ddl,
+ // it will be passed to the ddl statement of the table
extList.forEach(ext -> {
- String keyName = ext.get("keyName");
+ String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
if (StringUtils.isNoneBlank(keyName) &&
- keyName.startsWith(DDL_ATTRIBUTE_HUDI)) {
- options.put(keyName, ext.get("keyValue"));
+ keyName.startsWith(DDL_ATTR_PREFIX)) {
+ String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
+ String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+ options.put(ddlKeyName, ddlValue);
}
});
@@ -150,7 +163,7 @@ public class HudiLoadNode extends LoadNode implements
InlongMetric, Serializable
options.put(HUDI_OPTION_DATABASE_NAME, dbName);
options.put(HUDI_OPTION_TABLE_NAME, tableName);
- options.put(HUDI_OPTION_RECORDKEY_FIELD_NAME, primaryKey);
+ options.put(HUDI_OPTION_RECORD_KEY_FIELD_NAME, primaryKey);
options.put("connector", "hudi-inlong");
return options;
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
index dabfd54ac..ecdf88f0e 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
/**
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
index 0cb9802d1..32908d557 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -40,7 +40,7 @@ import
org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;