This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e507c0ecc [INLONG-4092][Manager] Add primary key in Kafka source
(#4094)
e507c0ecc is described below
commit e507c0ecc0618c1afb4562323ca58418caa5c118
Author: kipshi <[email protected]>
AuthorDate: Fri May 6 20:01:35 2022 +0800
[INLONG-4092][Manager] Add primary key in Kafka source (#4094)
* Add primary key in Kafka source
* Change the field doc
Co-authored-by: healchow <[email protected]>
---
.../org/apache/inlong/manager/client/api/source/KafkaSource.java | 3 +++
.../inlong/manager/client/api/util/InlongStreamSourceTransfer.java | 2 ++
.../apache/inlong/manager/client/api/util/InlongStreamTransfer.java | 2 +-
.../inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java | 4 ++++
.../manager/common/pojo/source/kafka/KafkaSourceListResponse.java | 3 +++
.../inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java | 3 +++
.../inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java | 3 +++
.../apache/inlong/manager/service/sort/util/ExtractNodeUtils.java | 5 +++--
8 files changed, 22 insertions(+), 3 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index f0ff7c70e..2b9c1dcf6 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -79,4 +79,7 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+
+ @ApiModelProperty("Primary key, needed when data format is csv, json,
avro")
+ private String primaryKey;
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index af9ce5b32..a2b5096d1 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -102,6 +102,7 @@ public class InlongStreamSourceTransfer {
kafkaSource.setIgnoreParseErrors(response.isIgnoreParseErrors());
kafkaSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
kafkaSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
+ kafkaSource.setPrimaryKey(response.getPrimaryKey());
return kafkaSource;
}
@@ -174,6 +175,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setTablePattern(kafkaSource.getTablePattern());
sourceRequest.setIgnoreParseErrors(kafkaSource.isIgnoreParseErrors());
sourceRequest.setTimestampFormatStandard(kafkaSource.getTimestampFormatStandard());
+ sourceRequest.setPrimaryKey(kafkaSource.getPrimaryKey());
sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(kafkaSource.getFields(),
streamInfo));
return sourceRequest;
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index d85d8009b..ed4192927 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -21,10 +21,10 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import java.util.List;
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index 4eb227f5c..e239d813f 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -85,6 +85,9 @@ public class KafkaSourceDTO {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard;
+ @ApiModelProperty("Field needed when serializationType is csv,json,avro")
+ private String primaryKey;
+
/**
* Get the dto instance from the request
*/
@@ -102,6 +105,7 @@ public class KafkaSourceDTO {
.tablePattern(request.getTablePattern())
.ignoreParseErrors(request.isIgnoreParseErrors())
.timestampFormatStandard(request.getTimestampFormatStandard())
+ .primaryKey(request.getPrimaryKey())
.build();
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 9190bd06d..6502ed1b8 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -57,4 +57,7 @@ public class KafkaSourceListResponse extends
SourceListResponse {
notes = "including earliest, latest (the default), none")
private String autoOffsetReset;
+ @ApiModelProperty("Primary key, needed when serialization type is csv,
json, avro")
+ private String primaryKey;
+
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 86e0168dc..04ae6ac07 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -73,6 +73,9 @@ public class KafkaSourceRequest extends SourceRequest {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard = "SQL";
+ @ApiModelProperty("Primary key, needed when serialization type is csv,
json, avro")
+ private String primaryKey;
+
public KafkaSourceRequest() {
this.setSourceType(SourceType.KAFKA.toString());
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 3055db19f..e640742f8 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -67,6 +67,9 @@ public class KafkaSourceResponse extends SourceResponse {
@ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
private String timestampFormatStandard;
+ @ApiModelProperty("Primary key, needed when serialization type is csv,
json, avro")
+ private String primaryKey;
+
public KafkaSourceResponse() {
this.setSourceType(SourceType.KAFKA.name());
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index aede87357..6ac8612b4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -100,7 +100,7 @@ public class ExtractNodeUtils {
.collect(Collectors.toList());
final String serverTimeZone = binlogSourceResponse.getServerTimezone();
boolean incrementalSnapshotEnabled = true;
-
+
// TODO Needs to be configurable for those parameters
Map<String, String> properties = Maps.newHashMap();
if (binlogSourceResponse.isAllMigration()) {
@@ -176,6 +176,7 @@ public class ExtractNodeUtils {
default:
startupMode = ScanStartupMode.LATEST_OFFSET;
}
+ final String primaryKey = kafkaSourceResponse.getPrimaryKey();
return new KafkaExtractNode(id,
name,
@@ -186,6 +187,6 @@ public class ExtractNodeUtils {
bootstrapServers,
format,
startupMode,
- null);
+ primaryKey);
}
}