This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d9df926330 [INLONG-9622][Sort] Update all deserializationInfo in 
sort-common module (#9623)
d9df926330 is described below

commit d9df926330ec22e78e4867b388a72ef491343361
Author: baomingyu <[email protected]>
AuthorDate: Mon Jan 29 14:21:25 2024 +0800

    [INLONG-9622][Sort] Update all deserializationInfo in sort-common module 
(#9623)
---
 .../deserialization/CsvDeserializationInfo.java    | 60 ++++++++++++++++++-
 .../deserialization/DeserializationInfo.java       |  1 +
 .../InLongMsgCsv2DeserializationInfo.java          | 44 +++++++++++++-
 .../InLongMsgCsvDeserializationInfo.java           | 42 +++++++++++++-
 .../InLongMsgKvDeserializationInfo.java            | 59 ++++++++++++++++++-
 .../InLongMsgTlogCsvDeserializationInfo.java       | 45 ++++++++++++++-
 .../InLongMsgTlogKvDeserializationInfo.java        | 48 +++++++++++++++-
 .../deserialization/KvDeserializationInfo.java     | 67 ++++++++++++++++++++--
 .../org/apache/inlong/sort/formats/csv/Csv.java    |  2 +-
 .../apache/inlong/sort/formats/csv/CsvTest.java    |  4 +-
 .../formats/inlongmsgbinlog/InLongMsgBinlog.java   |  2 +-
 .../inlongmsgbinlog/InLongMsgBinlogTest.java       |  2 +-
 .../sort/formats/inlongmsgcsv/InLongMsgCsv.java    |  2 +-
 .../formats/inlongmsgcsv/InLongMsgCsvTest.java     |  2 +-
 .../sort/formats/inlongmsgkv/InLongMsgKv.java      |  2 +-
 .../InLongMsgKvMixedFormatConverter.java           |  2 +-
 .../sort/formats/inlongmsgkv/InLongMsgKvTest.java  |  2 +-
 .../InLongMsgBinlogFormatFactory.java              |  2 +-
 ...nLongMsgBinlogRowDataDeserializationSchema.java |  2 +-
 .../InLongMsgBinlogFormatFactoryTest.java          | 10 ++--
 20 files changed, 367 insertions(+), 33 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index 575c99fd01..abbd5c149b 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -17,28 +17,82 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /**
  * Csv deserialization info
  */
-public class CsvDeserializationInfo implements DeserializationInfo {
+public class CsvDeserializationInfo extends InLongMsgDeserializationInfo {
 
-    private static final long serialVersionUID = -5035426390567887081L;
+    private static final long serialVersionUID = 7424482369272150638L;
 
     private final char splitter;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
+    private final String tid;
+
     // TODO: support mapping index to field
+    public CsvDeserializationInfo(
+            @JsonProperty("splitter") char splitter) {
+        this(TID_DEFAULT_VALUE, splitter, null);
+    }
+
+    public CsvDeserializationInfo(
+            @JsonProperty("splitter") char splitter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        this(TID_DEFAULT_VALUE, splitter, escapeChar);
+    }
 
     @JsonCreator
     public CsvDeserializationInfo(
-            @JsonProperty("splitter") char splitter) {
+            @JsonProperty("tid") String tid,
+            @JsonProperty("splitter") char splitter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        super(tid);
+        this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
         this.splitter = splitter;
+        this.escapeChar = escapeChar;
     }
 
     @JsonProperty("splitter")
     public char getSplitter() {
         return splitter;
     }
+
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
+    @JsonProperty("tid")
+    public String getTid() {
+        return tid;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        CsvDeserializationInfo other = (CsvDeserializationInfo) o;
+        return Objects.equals(tid, other.getTid()) && splitter == 
other.splitter
+                && Objects.equals(escapeChar, other.escapeChar);
+    }
+
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index b598d975ea..dfe68472fd 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -41,4 +41,5 @@ import java.io.Serializable;
 })
 public interface DeserializationInfo extends Serializable {
 
+    String TID_DEFAULT_VALUE = "-";
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 0806717fbf..7b45ed0026 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -18,8 +18,13 @@
 package org.apache.inlong.sort.protocol.deserialization;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /**
  * It represents CSV2 format of InLongMsg(m=9).
  */
@@ -29,16 +34,51 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
 
     private final char delimiter;
 
-    @JsonCreator
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
     public InLongMsgCsv2DeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter) {
+        this(tid, delimiter, null);
+    }
+
+    @JsonCreator
+    public InLongMsgCsv2DeserializationInfo(
+            @JsonProperty("tid") String tid,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
         super(tid);
         this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
     }
 
     @JsonProperty("delimiter")
     public char getDelimiter() {
         return delimiter;
     }
-}
+
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InLongMsgCsv2DeserializationInfo other = 
(InLongMsgCsv2DeserializationInfo) o;
+        return super.equals(other)
+                && delimiter == other.delimiter
+                && Objects.equals(escapeChar, other.escapeChar);
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 890cea3aac..79ec62f9f7 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -22,6 +22,10 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /**
  * It represents CSV format of InLongMsg(m=0).
  */
@@ -31,22 +35,35 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
 
     private final char delimiter;
 
+    @JsonInclude(Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
     @JsonInclude(Include.NON_NULL)
     private final boolean deleteHeadDelimiter;
 
     public InLongMsgCsvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter) {
-        this(tid, delimiter, true);
+        this(tid, delimiter, null, false);
+    }
+
+    public InLongMsgCsvDeserializationInfo(
+            @JsonProperty("tid") String tid,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
+        this(tid, delimiter, null, deleteHeadDelimiter);
     }
 
     @JsonCreator
     public InLongMsgCsvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
             @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
         super(tid);
         this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
         this.deleteHeadDelimiter = deleteHeadDelimiter;
     }
 
@@ -55,8 +72,31 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
         return delimiter;
     }
 
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
     @JsonProperty("delete_head_delimiter")
     public boolean isDeleteHeadDelimiter() {
         return deleteHeadDelimiter;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InLongMsgCsvDeserializationInfo other = 
(InLongMsgCsvDeserializationInfo) o;
+        return super.equals(other)
+                && delimiter == other.delimiter
+                && Objects.equals(escapeChar, other.escapeChar)
+                && deleteHeadDelimiter == other.deleteHeadDelimiter;
+    }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 47d74cbfd1..1a5c8ff67e 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -17,8 +17,15 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /**
  * It represents KV format of InLongMsg(m=5).
  */
@@ -30,13 +37,33 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
 
     private final char kvDelimiter;
 
+    @JsonInclude(Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
+    @JsonInclude(Include.NON_NULL)
+    @Nullable
+    private final Character lineDelimiter;
+
     public InLongMsgKvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter) {
+        this(tid, entryDelimiter, kvDelimiter, null, null);
+    }
+
+    @JsonCreator
+    public InLongMsgKvDeserializationInfo(
+            @JsonProperty("tid") String tid,
+            @JsonProperty("entry_delimiter") char entryDelimiter,
+            @JsonProperty("kv_delimiter") char kvDelimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("line_delimiter") @Nullable Character lineDelimiter) 
{
         super(tid);
         this.entryDelimiter = entryDelimiter;
         this.kvDelimiter = kvDelimiter;
+        this.escapeChar = escapeChar;
+        this.lineDelimiter = lineDelimiter == null ? '\n' : lineDelimiter;
     }
 
     @JsonProperty("entry_delimiter")
@@ -48,4 +75,34 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
     public char getKvDelimiter() {
         return kvDelimiter;
     }
-}
+
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
+    @JsonProperty("line_delimiter")
+    @Nullable
+    public Character getLineDelimiter() {
+        return lineDelimiter;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InLongMsgKvDeserializationInfo other = 
(InLongMsgKvDeserializationInfo) o;
+        return super.equals(other)
+                && entryDelimiter == other.entryDelimiter
+                && kvDelimiter == other.kvDelimiter
+                && Objects.equals(escapeChar, other.escapeChar)
+                && Objects.equals(lineDelimiter, other.lineDelimiter);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index 53426ff060..fbfb95c1fd 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -18,8 +18,14 @@
 package org.apache.inlong.sort.protocol.deserialization;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /**
  * It represents TLog CSV format of InLongMsg(m=10).
  */
@@ -29,16 +35,51 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
 
     private final char delimiter;
 
-    @JsonCreator
+    @JsonInclude(Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
     public InLongMsgTlogCsvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter) {
+        this(tid, delimiter, null);
+    }
+
+    @JsonCreator
+    public InLongMsgTlogCsvDeserializationInfo(
+            @JsonProperty("tid") String tid,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
         super(tid);
         this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
     }
 
     @JsonProperty("delimiter")
     public char getDelimiter() {
         return delimiter;
     }
-}
+
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InLongMsgTlogCsvDeserializationInfo other = 
(InLongMsgTlogCsvDeserializationInfo) o;
+        return super.equals(other)
+                && delimiter == other.delimiter
+                && Objects.equals(escapeChar, other.escapeChar);
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index 4c9bd16ea4..f3b2985128 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -17,8 +17,15 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /**
  * It represents TLog KV format of InLongMsg(m=15).
  */
@@ -32,15 +39,30 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
 
     private final char kvDelimiter;
 
+    @JsonInclude(Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
     public InLongMsgTlogKvDeserializationInfo(
             @JsonProperty("tid") String tid,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter) {
+        this(tid, delimiter, entryDelimiter, kvDelimiter, null);
+    }
+
+    @JsonCreator
+    public InLongMsgTlogKvDeserializationInfo(
+            @JsonProperty("tid") String tid,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("entry_delimiter") char entryDelimiter,
+            @JsonProperty("kv_delimiter") char kvDelimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
         super(tid);
         this.delimiter = delimiter;
         this.entryDelimiter = entryDelimiter;
         this.kvDelimiter = kvDelimiter;
+        this.escapeChar = escapeChar;
     }
 
     @JsonProperty("delimiter")
@@ -57,4 +79,28 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
     public char getKvDelimiter() {
         return kvDelimiter;
     }
-}
+
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InLongMsgTlogKvDeserializationInfo other = 
(InLongMsgTlogKvDeserializationInfo) o;
+        return super.equals(other)
+                && delimiter == other.delimiter
+                && entryDelimiter == other.entryDelimiter
+                && kvDelimiter == other.kvDelimiter
+                && Objects.equals(escapeChar, other.escapeChar);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index 73529a14de..c15839f50b 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -17,28 +17,56 @@
 
 package org.apache.inlong.sort.protocol.deserialization;
 
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
 
 /**
  * Kv deserialization info
  */
-public class KvDeserializationInfo implements DeserializationInfo {
+public class KvDeserializationInfo extends InLongMsgDeserializationInfo {
 
-    private static final long serialVersionUID = 1976031542480774581L;
+    private static final long serialVersionUID = -3182881360079888043L;
 
     private final char entrySplitter;
 
     private final char kvSplitter;
 
-    @JsonCreator
+    private final String tid;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final Character escapeChar;
+
     public KvDeserializationInfo(
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter) {
-        this.entrySplitter = checkNotNull(entrySplitter);
-        this.kvSplitter = checkNotNull(kvSplitter);
+        this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, null);
+    }
+
+    public KvDeserializationInfo(
+            @JsonProperty("entry_splitter") char entrySplitter,
+            @JsonProperty("kv_splitter") char kvSplitter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, escapeChar);
+    }
+
+    @JsonCreator
+    public KvDeserializationInfo(
+            @JsonProperty("tid") String tid,
+            @JsonProperty("entry_splitter") char entrySplitter,
+            @JsonProperty("kv_splitter") char kvSplitter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        super(tid);
+        this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
+        this.entrySplitter = entrySplitter;
+        this.kvSplitter = kvSplitter;
+        this.escapeChar = escapeChar;
     }
 
     @JsonProperty("entry_splitter")
@@ -50,4 +78,31 @@ public class KvDeserializationInfo implements 
DeserializationInfo {
     public char getKvSplitter() {
         return kvSplitter;
     }
+
+    @JsonProperty("escape_char")
+    @Nullable
+    public Character getEscapeChar() {
+        return escapeChar;
+    }
+
+    @JsonProperty("tid")
+    public String getTid() {
+        return tid;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        KvDeserializationInfo other = (KvDeserializationInfo) o;
+        return Objects.equals(tid, other.getTid()) && entrySplitter == 
other.entrySplitter
+                && kvSplitter == other.kvSplitter
+                && Objects.equals(escapeChar, other.escapeChar);
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
 
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
index 307cd7edfd..e84c8f9ea9 100644
--- 
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
@@ -30,7 +30,7 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DE
  */
 public class Csv extends TextFormatDescriptor<Csv> {
 
-    public static final String FORMAT_TYPE_VALUE = "InLong-CSV";
+    public static final String FORMAT_TYPE_VALUE = "inlong-csv";
 
     public Csv() {
         super(FORMAT_TYPE_VALUE, 1);
diff --git 
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
 
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
index d3d90ad8d2..93726d4632 100644
--- 
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
@@ -92,7 +92,7 @@ public class CsvTest extends DescriptorTestBase {
     @Override
     public List<Map<String, String>> properties() {
         final Map<String, String> props1 = new HashMap<>();
-        props1.put("format.type", "InLong-CSV");
+        props1.put("format.type", "inlong-csv");
         props1.put("format.property-version", "1");
         props1.put("format.schema", TEST_SCHEMA);
         props1.put("format.delimiter", ";");
@@ -102,7 +102,7 @@ public class CsvTest extends DescriptorTestBase {
         props1.put("format.null-literal", "n/a");
 
         final Map<String, String> props2 = new HashMap<>();
-        props2.put("format.type", "InLong-CSV");
+        props2.put("format.type", "inlong-csv");
         props2.put("format.property-version", "1");
         props2.put("format.derive-schema", "true");
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
index 7875f526ac..02fad5a8eb 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
@@ -41,7 +41,7 @@ import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtil
  */
 public class InLongMsgBinlog extends FormatDescriptor {
 
-    public static final String FORMAT_TYPE_VALUE = "InLongMsg-Binlog";
+    public static final String FORMAT_TYPE_VALUE = "inlong-msg-binlog";
 
     private DescriptorProperties internalProperties = new 
DescriptorProperties(true);
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
index 7b970d3a20..29a27b7d13 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
@@ -76,7 +76,7 @@ public class InLongMsgBinlogTest extends DescriptorTestBase {
     @Override
     public List<Map<String, String>> properties() {
         final Map<String, String> props1 = new HashMap<>();
-        props1.put("format.type", "InLongMsg-Binlog");
+        props1.put("format.type", "inlong-msg-binlog");
         props1.put("format.property-version", "1");
         props1.put("format.schema", marshall(TEST_SCHEMA));
         props1.put("format.time-field-name", "time");
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
index 1567ea0d3e..96e807a327 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
@@ -36,7 +36,7 @@ import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORM
  */
 public class InLongMsgCsv extends TextFormatDescriptor<InLongMsgCsv> {
 
-    public static final String FORMAT_TYPE_VALUE = "InLongMsg-CSV";
+    public static final String FORMAT_TYPE_VALUE = "inlong-msg-csv";
 
     public InLongMsgCsv() {
         super(FORMAT_TYPE_VALUE, 1);
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
index 72acf4486b..e23967dabb 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
@@ -88,7 +88,7 @@ public class InLongMsgCsvTest extends DescriptorTestBase {
     @Override
     public List<Map<String, String>> properties() {
         final Map<String, String> props1 = new HashMap<>();
-        props1.put("format.type", "InLongMsg-CSV");
+        props1.put("format.type", "inlong-msg-csv");
         props1.put("format.property-version", "1");
         props1.put("format.schema", TEST_SCHEMA);
         props1.put("format.time-field-name", "time");
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
index 52eb340ae1..1a1b93cc16 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
@@ -32,7 +32,7 @@ import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIM
  */
 public class InLongMsgKv extends TextFormatDescriptor<InLongMsgKv> {
 
-    public static final String FORMAT_TYPE_VALUE = "InLongMsg-KV";
+    public static final String FORMAT_TYPE_VALUE = "inlong-msg-kv";
 
     public InLongMsgKv() {
         super(FORMAT_TYPE_VALUE, 1);
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index 59195b87e1..a46fa78459 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -37,7 +37,7 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- * Converter used to deserialize a mixed row in InLongMsg-kv format.
+ * Converter used to deserialize a mixed row in inlong-msg-kv format.
  */
 public class InLongMsgKvMixedFormatConverter extends 
AbstractInLongMsgMixedFormatConverter {
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
index 8b84fa125c..d384ac306c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
@@ -88,7 +88,7 @@ public class InLongMsgKvTest extends DescriptorTestBase {
     @Override
     public List<Map<String, String>> properties() {
         final Map<String, String> props1 = new HashMap<>();
-        props1.put("format.type", "InLongMsg-KV");
+        props1.put("format.type", "inlong-msg-kv");
         props1.put("format.property-version", "1");
         props1.put("format.schema", TEST_SCHEMA);
         props1.put("format.time-field-name", "time");
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
index af3e7d320d..447bafcf57 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
@@ -44,7 +44,7 @@ import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIE
  */
 public final class InLongMsgBinlogFormatFactory implements 
DeserializationFormatFactory {
 
-    public static final String IDENTIFIER = "InLongMsg-Binlog";
+    public static final String IDENTIFIER = "inlong-msg-binlog";
 
     @Override
     public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
index ade0705f0b..3d0d88d401 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
@@ -29,7 +29,7 @@ import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_AT
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 
 /**
- * Deserialization schema from InLongMsg-Binlog to Flink Table & SQL internal 
data structures.
+ * Deserialization schema from inlong-msg-binlog to Flink Table & SQL internal 
data structures.
  */
 public class InLongMsgBinlogRowDataDeserializationSchema extends 
AbstractInLongMsgDeserializationSchema {
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
index 8b27c74137..ccb05d7f08 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
@@ -109,11 +109,11 @@ public class InLongMsgBinlogFormatFactoryTest {
         options.put("buffer-size", "1000");
 
         options.put("format", InLongMsgBinlogFormatFactory.IDENTIFIER);
-        options.put("InLongMsg-Binlog.row.format.info", 
FormatUtils.marshall(testFormatInfo));
-        options.put("InLongMsg-Binlog.format.time-field-name", "time");
-        options.put("InLongMsg-Binlog.format.attribute-field-name", 
"attributes");
-        options.put("InLongMsg-Binlog.format.ignore-errors", "true");
-        options.put("InLongMsg-Binlog.format.include-update-before", "false");
+        options.put("inlong-msg-binlog.row.format.info", 
FormatUtils.marshall(testFormatInfo));
+        options.put("inlong-msg-binlog.format.time-field-name", "time");
+        options.put("inlong-msg-binlog.format.attribute-field-name", 
"attributes");
+        options.put("inlong-msg-binlog.format.ignore-errors", "true");
+        options.put("inlong-msg-binlog.format.include-update-before", "false");
         return options;
     }
 }


Reply via email to