This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e507c0ecc [INLONG-4092][Manager] Add primary key in Kafka source 
(#4094)
e507c0ecc is described below

commit e507c0ecc0618c1afb4562323ca58418caa5c118
Author: kipshi <[email protected]>
AuthorDate: Fri May 6 20:01:35 2022 +0800

    [INLONG-4092][Manager] Add primary key in Kafka source (#4094)
    
    * Add primary key in Kafka source
    
    * Change the field doc
    
    Co-authored-by: healchow <[email protected]>
---
 .../org/apache/inlong/manager/client/api/source/KafkaSource.java     | 3 +++
 .../inlong/manager/client/api/util/InlongStreamSourceTransfer.java   | 2 ++
 .../apache/inlong/manager/client/api/util/InlongStreamTransfer.java  | 2 +-
 .../inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java      | 4 ++++
 .../manager/common/pojo/source/kafka/KafkaSourceListResponse.java    | 3 +++
 .../inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java  | 3 +++
 .../inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java | 3 +++
 .../apache/inlong/manager/service/sort/util/ExtractNodeUtils.java    | 5 +++--
 8 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index f0ff7c70e..2b9c1dcf6 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -79,4 +79,7 @@ public class KafkaSource extends StreamSource {
 
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
+
+    @ApiModelProperty("Primary key, needed when data format is csv, json, 
avro")
+    private String primaryKey;
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index af9ce5b32..a2b5096d1 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -102,6 +102,7 @@ public class InlongStreamSourceTransfer {
         kafkaSource.setIgnoreParseErrors(response.isIgnoreParseErrors());
         
kafkaSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
         
kafkaSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
+        kafkaSource.setPrimaryKey(response.getPrimaryKey());
         return kafkaSource;
     }
 
@@ -174,6 +175,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setTablePattern(kafkaSource.getTablePattern());
         sourceRequest.setIgnoreParseErrors(kafkaSource.isIgnoreParseErrors());
         
sourceRequest.setTimestampFormatStandard(kafkaSource.getTimestampFormatStandard());
+        sourceRequest.setPrimaryKey(kafkaSource.getPrimaryKey());
         
sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(kafkaSource.getFields(),
 streamInfo));
         return sourceRequest;
     }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index d85d8009b..ed4192927 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -21,10 +21,10 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
 import java.util.List;
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 4eb227f5c..e239d813f 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -85,6 +85,9 @@ public class KafkaSourceDTO {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard;
 
+    @ApiModelProperty("Field needed when serializationType is csv,json,avro")
+    private String primaryKey;
+
     /**
      * Get the dto instance from the request
      */
@@ -102,6 +105,7 @@ public class KafkaSourceDTO {
                 .tablePattern(request.getTablePattern())
                 .ignoreParseErrors(request.isIgnoreParseErrors())
                 .timestampFormatStandard(request.getTimestampFormatStandard())
+                .primaryKey(request.getPrimaryKey())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 9190bd06d..6502ed1b8 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -57,4 +57,7 @@ public class KafkaSourceListResponse extends 
SourceListResponse {
             notes = "including earliest, latest (the default), none")
     private String autoOffsetReset;
 
+    @ApiModelProperty("Primary key, needed when serialization type is csv, 
json, avro")
+    private String primaryKey;
+
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 86e0168dc..04ae6ac07 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -73,6 +73,9 @@ public class KafkaSourceRequest extends SourceRequest {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard = "SQL";
 
+    @ApiModelProperty("Primary key, needed when serialization type is csv, 
json, avro")
+    private String primaryKey;
+
     public KafkaSourceRequest() {
         this.setSourceType(SourceType.KAFKA.toString());
     }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 3055db19f..e640742f8 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -67,6 +67,9 @@ public class KafkaSourceResponse extends SourceResponse {
     @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
     private String timestampFormatStandard;
 
+    @ApiModelProperty("Primary key, needed when serialization type is csv, 
json, avro")
+    private String primaryKey;
+
     public KafkaSourceResponse() {
         this.setSourceType(SourceType.KAFKA.name());
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index aede87357..6ac8612b4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -100,7 +100,7 @@ public class ExtractNodeUtils {
                 .collect(Collectors.toList());
         final String serverTimeZone = binlogSourceResponse.getServerTimezone();
         boolean incrementalSnapshotEnabled = true;
-        
+
         // TODO Needs to be configurable for those parameters
         Map<String, String> properties = Maps.newHashMap();
         if (binlogSourceResponse.isAllMigration()) {
@@ -176,6 +176,7 @@ public class ExtractNodeUtils {
             default:
                 startupMode = ScanStartupMode.LATEST_OFFSET;
         }
+        final String primaryKey = kafkaSourceResponse.getPrimaryKey();
 
         return new KafkaExtractNode(id,
                 name,
@@ -186,6 +187,6 @@ public class ExtractNodeUtils {
                 bootstrapServers,
                 format,
                 startupMode,
-                null);
+                primaryKey);
     }
 }

Reply via email to