This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 7f002a390 [INLONG-5940][Sort] Compatible with the old version of sort
protocol with mysql and kafka extract node (#5945)
7f002a390 is described below
commit 7f002a3902d405f1ca90276b1f4e64da444f3f01
Author: Charles <[email protected]>
AuthorDate: Tue Sep 20 14:46:33 2022 +0800
[INLONG-5940][Sort] Compatible with the old version of sort protocol with
mysql and kafka extract node (#5945)
---
.../protocol/node/extract/KafkaExtractNode.java | 22 +++++++++++++++++++++-
.../protocol/node/extract/MySqlExtractNode.java | 15 ++++++---------
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index f9848f4bd..4ef7ec9aa 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -22,6 +22,8 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.common.enums.MetaField;
@@ -52,6 +54,7 @@ import java.util.Set;
*/
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("kafkaExtract")
+@JsonInclude(Include.NON_NULL)
@Data
public class KafkaExtractNode extends ExtractNode implements InlongMetric,
Metadata, Serializable {
@@ -79,6 +82,21 @@ public class KafkaExtractNode extends ExtractNode implements
InlongMetric, Metad
@JsonProperty("scanSpecificOffsets")
private String scanSpecificOffsets;
+ public KafkaExtractNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("watermarkField") WatermarkField
watermarkField,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("topic") String topic,
+ @Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
+ @Nonnull @JsonProperty("format") Format format,
+ @JsonProperty("scanStartupMode") KafkaScanStartupMode
kafkaScanStartupMode,
+ @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("groupId") String groupId) {
+ this(id, name, fields, watermarkField, properties, topic,
bootstrapServers, format, kafkaScanStartupMode,
+ primaryKey, groupId, null);
+ }
+
@JsonCreator
public KafkaExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -130,7 +148,9 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
} else if (format instanceof CanalJsonFormat || format instanceof
DebeziumJsonFormat) {
options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
options.put(KafkaConstant.SCAN_STARTUP_MODE,
kafkaScanStartupMode.getValue());
- options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS,
scanSpecificOffsets);
+ if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
+ options.put(KafkaConstant.SCAN_STARTUP_SPECIFIC_OFFSETS,
scanSpecificOffsets);
+ }
options.putAll(format.generateOptions(false));
} else {
throw new IllegalArgumentException("kafka extract node format is
IllegalArgument");
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
index b1d0f2783..62bc19530 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/MySqlExtractNode.java
@@ -47,12 +47,12 @@ import java.util.Set;
*/
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("mysqlExtract")
+@JsonInclude(Include.NON_NULL)
@Data
public class MySqlExtractNode extends ExtractNode implements Metadata,
InlongMetric, Serializable {
private static final long serialVersionUID = -5521981462461235277L;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("primaryKey")
private String primaryKey;
@JsonProperty("tableNames")
@@ -66,16 +66,12 @@ public class MySqlExtractNode extends ExtractNode
implements Metadata, InlongMet
private String password;
@JsonProperty("database")
private String database;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("port")
private Integer port;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("serverId")
private Integer serverId;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("incrementalSnapshotEnabled")
private Boolean incrementalSnapshotEnabled;
- @JsonInclude(Include.NON_NULL)
@JsonProperty("serverTimeZone")
private String serverTimeZone;
@Nonnull
@@ -193,13 +189,14 @@ public class MySqlExtractNode extends ExtractNode
implements Metadata, InlongMet
super(id, name, fields, watermarkField, properties);
this.tableNames = Preconditions.checkNotNull(tableNames, "tableNames
is null");
Preconditions.checkState(!tableNames.isEmpty(), "tableNames is empty");
- if (extractMode == ExtractMode.CDC) {
- this.hostname = Preconditions.checkNotNull(hostname, "hostname is
null");
- this.database = Preconditions.checkNotNull(database, "database is
null");
- } else {
+ if (extractMode == ExtractMode.SCAN) {
this.hostname = hostname;
this.database = database;
this.url = Preconditions.checkNotNull(url, "url is null");
+ } else {
+ extractMode = ExtractMode.CDC;
+ this.hostname = Preconditions.checkNotNull(hostname, "hostname is
null");
+ this.database = Preconditions.checkNotNull(database, "database is
null");
}
this.username = Preconditions.checkNotNull(username, "username is
null");
this.password = Preconditions.checkNotNull(password, "password is
null");