This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 03933e4072 [INLONG-9783][Sort] Add compatibility processing of tid to
streamId changes in the message deserialization base class (#9785)
03933e4072 is described below
commit 03933e40722a15a50225c52396ac5f78a4897f2b
Author: baomingyu <[email protected]>
AuthorDate: Thu Mar 7 16:42:57 2024 +0800
[INLONG-9783][Sort] Add compatibility processing of tid to streamId changes
in the message deserialization base class (#9785)
---
.../deserialization/InLongMsgCsv2DeserializationInfo.java | 7 +++++--
.../deserialization/InLongMsgCsvDeserializationInfo.java | 9 ++++++---
.../protocol/deserialization/InLongMsgDeserializationInfo.java | 10 +++++++++-
.../deserialization/InLongMsgKvDeserializationInfo.java | 7 +++++--
.../deserialization/InLongMsgTlogCsvDeserializationInfo.java | 7 +++++--
.../deserialization/InLongMsgTlogKvDeserializationInfo.java | 7 +++++--
.../sort/protocol/deserialization/JsonDeserializationInfo.java | 3 +++
.../sort/protocol/deserialization/KvDeserializationInfo.java | 5 ++++-
8 files changed, 42 insertions(+), 13 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 0b8e45d241..cb892c64fb 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -28,6 +30,7 @@ import java.util.Objects;
/**
* It represents CSV2 format of InLongMsg(m=9).
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationInfo {
private static final long serialVersionUID = 2188769102604850019L;
@@ -39,14 +42,14 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
private final Character escapeChar;
public InLongMsgCsv2DeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter) {
this(streamId, delimiter, null);
}
@JsonCreator
public InLongMsgCsv2DeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(streamId);
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index d5817b4503..197be48b47 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
/**
* It represents CSV format of InLongMsg(m=0).
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInfo {
private static final long serialVersionUID = 1499370571949888870L;
@@ -43,13 +46,13 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
private final boolean deleteHeadDelimiter;
public InLongMsgCsvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter) {
this(streamId, delimiter, null, false);
}
public InLongMsgCsvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
this(streamId, delimiter, null, deleteHeadDelimiter);
@@ -57,7 +60,7 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
@JsonCreator
public InLongMsgCsvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
index 357a4fb886..723031e9a3 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -24,18 +26,24 @@ import static
com.google.common.base.Preconditions.checkNotNull;
/**
* InLongMsgDeserializationInfo.
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class InLongMsgDeserializationInfo implements
DeserializationInfo {
private static final long serialVersionUID = 3707412713264864315L;
private final String streamId;
- public InLongMsgDeserializationInfo(@JsonProperty("streamId") String
streamId) {
+ public InLongMsgDeserializationInfo(@JsonProperty("streamId")
@JsonAlias(value = {"tid"}) String streamId) {
this.streamId = checkNotNull(streamId);
}
@JsonProperty("streamId")
+ @JsonAlias(value = {"tid"})
public String getStreamId() {
return streamId;
}
+
+ public String getTid() {
+ return streamId;
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 99ff27f8ba..de999da6ff 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
/**
* It represents KV format of InLongMsg(m=5).
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo {
private static final long serialVersionUID = 8431516458466278968L;
@@ -46,7 +49,7 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
private final Character lineDelimiter;
public InLongMsgKvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
this(streamId, entryDelimiter, kvDelimiter, null, null);
@@ -54,7 +57,7 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
@JsonCreator
public InLongMsgKvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index 223a12d2b5..ba935da023 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
/**
* It represents TLog CSV format of InLongMsg(m=10).
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializationInfo {
private static final long serialVersionUID = -6585242216925992303L;
@@ -40,14 +43,14 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
private final Character escapeChar;
public InLongMsgTlogCsvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter) {
this(streamId, delimiter, null);
}
@JsonCreator
public InLongMsgTlogCsvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(streamId);
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index 77ad77ac82..6ec72fdd7a 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
/**
* It represents TLog KV format of InLongMsg(m=15).
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserializationInfo {
private static final long serialVersionUID = 3299931901024581425L;
@@ -44,7 +47,7 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
private final Character escapeChar;
public InLongMsgTlogKvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
@@ -53,7 +56,7 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
@JsonCreator
public InLongMsgTlogKvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
index 8b19ad729b..110c3c44aa 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/JsonDeserializationInfo.java
@@ -17,9 +17,12 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
/**
* Json deserialization info
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class JsonDeserializationInfo implements DeserializationInfo {
private static final long serialVersionUID = -5344203248610337314L;
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index 583316c783..dd14eaebdd 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -18,7 +18,9 @@
package org.apache.inlong.sort.protocol.deserialization;
import org.apache.commons.lang3.StringUtils;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,6 +31,7 @@ import java.util.Objects;
/**
* Kv deserialization info
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class KvDeserializationInfo extends InLongMsgDeserializationInfo {
private static final long serialVersionUID = -3182881360079888043L;
@@ -58,7 +61,7 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
@JsonCreator
public KvDeserializationInfo(
- @JsonProperty("streamId") String streamId,
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {