This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new afaff30d9 [INLONG-7271][Manager] Support comma separation for primary 
key and partition key of Hudi table (#7272)
afaff30d9 is described below

commit afaff30d9fec593f07bbb9edc83bf688475f596a
Author: feat <[email protected]>
AuthorDate: Sun Jan 29 13:03:49 2023 +0800

    [INLONG-7271][Manager] Support comma separation for primary key and 
partition key of Hudi table (#7272)
---
 .../manager/service/sink/hudi/HudiSinkOperator.java   | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
index e5613d4c9..6b3ff7d9c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hudi/HudiSinkOperator.java
@@ -18,10 +18,15 @@
 package org.apache.inlong.manager.service.sink.hudi;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
@@ -77,20 +82,22 @@ public class HudiSinkOperator extends AbstractSinkOperator {
 
         String partitionKey = sinkRequest.getPartitionKey();
         String primaryKey = sinkRequest.getPrimaryKey();
-        boolean primaryKeyExist = StringUtils.isNotEmpty(partitionKey);
-        boolean partitionKeyExist = StringUtils.isNotEmpty(primaryKey);
+        boolean primaryKeyExist = StringUtils.isNotBlank(primaryKey);
+        boolean partitionKeyExist = StringUtils.isNotBlank(partitionKey);
         if (primaryKeyExist || partitionKeyExist) {
             Set<String> fieldNames = 
sinkRequest.getSinkFieldList().stream().map(SinkField::getFieldName)
                     .collect(Collectors.toSet());
-            if (primaryKeyExist) {
-                if (!fieldNames.contains(partitionKey)) {
+            if (partitionKeyExist) {
+                List<String> partitionKeys = 
Arrays.asList(partitionKey.split(InlongConstants.COMMA));
+                if (!CollectionUtils.isSubCollection(partitionKeys, 
fieldNames)) {
                     throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
                             String.format("The partitionKey(%s) must be 
included in the sinkFieldList(%s)",
                                     partitionKey, fieldNames));
                 }
             }
-            if (partitionKeyExist) {
-                if (!fieldNames.contains(primaryKey)) {
+            if (primaryKeyExist) {
+                List<String> primaryKeys = 
Arrays.asList(primaryKey.split(InlongConstants.COMMA));
+                if (!CollectionUtils.isSubCollection(primaryKeys, fieldNames)) 
{
                     throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
                             String.format("The primaryKey(%s) must be included 
in the sinkFieldList(%s)",
                                     primaryKey, fieldNames));

Reply via email to