This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5ff109f6d1 [INLONG-9333][Sort] Related problem with attribute
exceptions when creating 'hudiSink' (#9334)
5ff109f6d1 is described below
commit 5ff109f6d15c80822879e4f032548d40c4f6077b
Author: LiJie20190102 <[email protected]>
AuthorDate: Mon Nov 27 17:16:52 2023 +0800
[INLONG-9333][Sort] Related problem with attribute exceptions when creating
'hudiSink' (#9334)
Co-authored-by: lijie0203 <[email protected]>
---
.../inlong/manager/client/File2HudiExample.java | 8 ++++++++
.../sort/protocol/node/extract/HudiExtractNode.java | 21 ++++++++++++---------
.../sort/protocol/node/load/HudiLoadNode.java | 21 ++++++++++++---------
3 files changed, 32 insertions(+), 18 deletions(-)
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
index 6162d7fd53..023ea92433 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HudiExample.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -157,6 +158,13 @@ public class File2HudiExample extends BaseExample {
fields.add(field3);
fields.add(field4);
sink.setSinkFieldList(fields);
+
+ List<HashMap<String, String>> extList = new ArrayList<>();
+ HashMap<String, String> map = new HashMap<>();
+ map.put("hoodie.datasource.hive_sync.partition_fields", "name");
+ extList.add(map);
+ sink.setExtList(extList);
+ sink.setPrimaryKey("name");
return sink;
}
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
index cc945ac5e6..6b34a174c7 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/HudiExtractNode.java
@@ -25,6 +25,7 @@ import
org.apache.inlong.sort.protocol.transformation.WatermarkField;
import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
@@ -142,15 +143,17 @@ public class HudiExtractNode extends ExtractNode
implements Serializable {
// If the extend attributes starts with .ddl,
// it will be passed to the ddl statement of the table
- extList.forEach(ext -> {
- String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
- if (StringUtils.isNoneBlank(keyName) &&
- keyName.startsWith(DDL_ATTR_PREFIX)) {
- String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
- String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
- options.put(ddlKeyName, ddlValue);
- }
- });
+ if (CollectionUtils.isNotEmpty(extList)) {
+ extList.forEach(ext -> {
+ String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
+ if (StringUtils.isNoneBlank(keyName) &&
+ keyName.startsWith(DDL_ATTR_PREFIX)) {
+ String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
+ String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+ options.put(ddlKeyName, ddlValue);
+ }
+ });
+ }
String path = String.format("%s/%s.db/%s", warehouse, dbName,
tableName);
options.put(HUDI_OPTION_DEFAULT_PATH, path);
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
index e411f89a8a..859c80cbda 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -145,15 +146,17 @@ public class HudiLoadNode extends LoadNode implements
InlongMetric, Serializable
// If the extend attributes starts with .ddl,
// it will be passed to the ddl statement of the table
- extList.forEach(ext -> {
- String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
- if (StringUtils.isNoneBlank(keyName) &&
- keyName.startsWith(DDL_ATTR_PREFIX)) {
- String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
- String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
- options.put(ddlKeyName, ddlValue);
- }
- });
+ if (CollectionUtils.isNotEmpty(extList)) {
+ extList.forEach(ext -> {
+ String keyName = ext.get(EXTEND_ATTR_KEY_NAME);
+ if (StringUtils.isNoneBlank(keyName) &&
+ keyName.startsWith(DDL_ATTR_PREFIX)) {
+ String ddlKeyName =
keyName.substring(DDL_ATTR_PREFIX.length());
+ String ddlValue = ext.get(EXTEND_ATTR_VALUE_NAME);
+ options.put(ddlKeyName, ddlValue);
+ }
+ });
+ }
String path = String.format("%s/%s.db/%s", warehouse, dbName,
tableName);
options.put(HUDI_OPTION_DEFAULT_PATH, path);