This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f5e57186c [INLONG-11821][Sort] kv and csv deserialization 
configuration support whether to remove and automatically add escape 
configuration (#11822)
5f5e57186c is described below

commit 5f5e57186cf01f13cedb20c7aafa7a81b285948f
Author: Mingyu Bao <[email protected]>
AuthorDate: Wed Apr 16 10:00:17 2025 +0800

    [INLONG-11821][Sort] kv and csv deserialization configuration support 
whether to remove and automatically add escape configuration (#11822)
---
 .../inlong/sort/protocol/constant/Constant.java    |  4 ++
 .../deserialization/CsvDeserializationInfo.java    | 47 +++++++++++++++++++--
 .../InLongMsgCsv2DeserializationInfo.java          | 49 +++++++++++++++++++++-
 .../InLongMsgCsvDeserializationInfo.java           | 48 ++++++++++++++++++++-
 .../InLongMsgKvDeserializationInfo.java            | 49 +++++++++++++++++++++-
 .../InLongMsgTlogCsvDeserializationInfo.java       | 46 +++++++++++++++++++-
 .../InLongMsgTlogKvDeserializationInfo.java        | 49 +++++++++++++++++++++-
 .../deserialization/KvDeserializationInfo.java     | 48 ++++++++++++++++++++-
 8 files changed, 329 insertions(+), 11 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
index b425cfea60..092c8fa201 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
@@ -40,4 +40,8 @@ public class Constant {
      * The multiple table-pattern of sink
      */
     public static final String SINK_MULTIPLE_TABLE_PATTERN = 
"sink.multiple.table-pattern";
+
+    public static final Boolean DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT = 
true;
+
+    public static final Boolean 
AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT = false;
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index e43a2f129f..abc2310ccb 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -26,6 +26,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * Csv deserialization info
  */
@@ -41,27 +44,43 @@ public class CsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
     private final String streamId;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     // TODO: support mapping index to field
     public CsvDeserializationInfo(
             @JsonProperty("splitter") char splitter) {
-        this(STREAM_ID_DEFAULT_VALUE, splitter, null);
+        this(STREAM_ID_DEFAULT_VALUE, splitter, null,
+                DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
     }
 
     public CsvDeserializationInfo(
             @JsonProperty("splitter") char splitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
-        this(STREAM_ID_DEFAULT_VALUE, splitter, escapeChar);
+        this(STREAM_ID_DEFAULT_VALUE, splitter, escapeChar,
+                DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
     }
 
     @JsonCreator
     public CsvDeserializationInfo(
             @JsonProperty("streamId") String streamId,
             @JsonProperty("splitter") char splitter,
-            @JsonProperty("escape_char") @Nullable Character escapeChar) {
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.streamId = (StringUtils.isEmpty(streamId) ? 
STREAM_ID_DEFAULT_VALUE : streamId);
         this.splitter = splitter;
         this.escapeChar = escapeChar;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("splitter")
@@ -80,6 +99,24 @@ public class CsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         return streamId;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -92,7 +129,9 @@ public class CsvDeserializationInfo extends 
InLongMsgDeserializationInfo {
 
         CsvDeserializationInfo other = (CsvDeserializationInfo) o;
         return Objects.equals(streamId, other.getStreamId()) && splitter == 
other.splitter
-                && Objects.equals(escapeChar, other.escapeChar);
+                && Objects.equals(escapeChar, other.escapeChar)
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index cb892c64fb..a1b8e7589c 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -27,6 +27,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * It represents CSV2 format of InLongMsg(m=9).
  */
@@ -41,10 +44,19 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
     @Nullable
     private final Character escapeChar;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     public InLongMsgCsv2DeserializationInfo(
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter) {
-        this(streamId, delimiter, null);
+        this(streamId, delimiter, null, 
DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
     }
 
     @JsonCreator
@@ -52,9 +64,22 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        this(streamId, delimiter, escapeChar, 
DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+    }
+
+    @JsonCreator
+    public InLongMsgCsv2DeserializationInfo(
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("delimiter")
@@ -68,6 +93,24 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
         return escapeChar;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -81,7 +124,9 @@ public class InLongMsgCsv2DeserializationInfo extends 
InLongMsgDeserializationIn
         InLongMsgCsv2DeserializationInfo other = 
(InLongMsgCsv2DeserializationInfo) o;
         return super.equals(other)
                 && delimiter == other.delimiter
-                && Objects.equals(escapeChar, other.escapeChar);
+                && Objects.equals(escapeChar, other.escapeChar)
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 197be48b47..341d8dfaae 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * It represents CSV format of InLongMsg(m=0).
  */
@@ -45,6 +48,14 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
     @JsonInclude(Include.NON_NULL)
     private final boolean deleteHeadDelimiter;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     public InLongMsgCsvDeserializationInfo(
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter) {
@@ -64,10 +75,25 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar,
             @JsonProperty("delete_head_delimiter") boolean 
deleteHeadDelimiter) {
+        this(streamId, delimiter, escapeChar, deleteHeadDelimiter,
+                DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+    }
+
+    @JsonCreator
+    public InLongMsgCsvDeserializationInfo(
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
         this.deleteHeadDelimiter = deleteHeadDelimiter;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("delimiter")
@@ -86,6 +112,24 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
         return deleteHeadDelimiter;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -100,6 +144,8 @@ public class InLongMsgCsvDeserializationInfo extends 
InLongMsgDeserializationInf
         return super.equals(other)
                 && delimiter == other.delimiter
                 && Objects.equals(escapeChar, other.escapeChar)
-                && deleteHeadDelimiter == other.deleteHeadDelimiter;
+                && deleteHeadDelimiter == other.deleteHeadDelimiter
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 }
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index de999da6ff..406045195b 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * It represents KV format of InLongMsg(m=5).
  */
@@ -48,6 +51,14 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
     @Nullable
     private final Character lineDelimiter;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     public InLongMsgKvDeserializationInfo(
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("entry_delimiter") char entryDelimiter,
@@ -62,11 +73,27 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
             @JsonProperty("kv_delimiter") char kvDelimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar,
             @JsonProperty("line_delimiter") @Nullable Character lineDelimiter) 
{
+        this(streamId, entryDelimiter, kvDelimiter, escapeChar, lineDelimiter,
+                DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+    }
+
+    @JsonCreator
+    public InLongMsgKvDeserializationInfo(
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
+            @JsonProperty("entry_delimiter") char entryDelimiter,
+            @JsonProperty("kv_delimiter") char kvDelimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("line_delimiter") @Nullable Character lineDelimiter,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.entryDelimiter = entryDelimiter;
         this.kvDelimiter = kvDelimiter;
         this.escapeChar = escapeChar;
         this.lineDelimiter = lineDelimiter == null ? '\n' : lineDelimiter;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("entry_delimiter")
@@ -91,6 +118,24 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
         return lineDelimiter;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -106,6 +151,8 @@ public class InLongMsgKvDeserializationInfo extends 
InLongMsgDeserializationInfo
                 && entryDelimiter == other.entryDelimiter
                 && kvDelimiter == other.kvDelimiter
                 && Objects.equals(escapeChar, other.escapeChar)
-                && Objects.equals(lineDelimiter, other.lineDelimiter);
+                && Objects.equals(lineDelimiter, other.lineDelimiter)
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index ba935da023..10fe782336 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * It represents TLog CSV format of InLongMsg(m=10).
  */
@@ -42,6 +45,14 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
     @Nullable
     private final Character escapeChar;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     public InLongMsgTlogCsvDeserializationInfo(
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter) {
@@ -53,9 +64,22 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        this(streamId, delimiter, escapeChar, 
DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+    }
+
+    @JsonCreator
+    public InLongMsgTlogCsvDeserializationInfo(
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("delimiter")
@@ -69,6 +93,24 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
         return escapeChar;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -82,7 +124,9 @@ public class InLongMsgTlogCsvDeserializationInfo extends 
InLongMsgDeserializatio
         InLongMsgTlogCsvDeserializationInfo other = 
(InLongMsgTlogCsvDeserializationInfo) o;
         return super.equals(other)
                 && delimiter == other.delimiter
-                && Objects.equals(escapeChar, other.escapeChar);
+                && Objects.equals(escapeChar, other.escapeChar)
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index 6ec72fdd7a..7726818498 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * It represents TLog KV format of InLongMsg(m=15).
  */
@@ -46,6 +49,14 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
     @Nullable
     private final Character escapeChar;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     public InLongMsgTlogKvDeserializationInfo(
             @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
             @JsonProperty("delimiter") char delimiter,
@@ -61,11 +72,27 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
             @JsonProperty("entry_delimiter") char entryDelimiter,
             @JsonProperty("kv_delimiter") char kvDelimiter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        this(streamId, delimiter, entryDelimiter, kvDelimiter, escapeChar,
+                DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+    }
+
+    @JsonCreator
+    public InLongMsgTlogKvDeserializationInfo(
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
+            @JsonProperty("delimiter") char delimiter,
+            @JsonProperty("entry_delimiter") char entryDelimiter,
+            @JsonProperty("kv_delimiter") char kvDelimiter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.delimiter = delimiter;
         this.entryDelimiter = entryDelimiter;
         this.kvDelimiter = kvDelimiter;
         this.escapeChar = escapeChar;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("delimiter")
@@ -89,6 +116,24 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
         return escapeChar;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -104,6 +149,8 @@ public class InLongMsgTlogKvDeserializationInfo extends 
InLongMsgDeserialization
                 && delimiter == other.delimiter
                 && entryDelimiter == other.entryDelimiter
                 && kvDelimiter == other.kvDelimiter
-                && Objects.equals(escapeChar, other.escapeChar);
+                && Objects.equals(escapeChar, other.escapeChar)
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index dd14eaebdd..ad22b35f53 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
 
 import java.util.Objects;
 
+import static 
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static 
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
 /**
  * Kv deserialization info
  */
@@ -46,6 +49,14 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
     @Nullable
     private final Character escapeChar;
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    private final Boolean deleteEscapeCharWhileDes;
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    private final Boolean autoAppendEscapeCharAfterDes;
+
     public KvDeserializationInfo(
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter) {
@@ -65,11 +76,26 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
             @JsonProperty("entry_splitter") char entrySplitter,
             @JsonProperty("kv_splitter") char kvSplitter,
             @JsonProperty("escape_char") @Nullable Character escapeChar) {
+        this(streamId, entrySplitter, kvSplitter, escapeChar,
+                DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+                AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+    }
+
+    @JsonCreator
+    public KvDeserializationInfo(
+            @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String 
streamId,
+            @JsonProperty("entry_splitter") char entrySplitter,
+            @JsonProperty("kv_splitter") char kvSplitter,
+            @JsonProperty("escape_char") @Nullable Character escapeChar,
+            @JsonProperty("delete_escape_char_while_deserialize") @Nullable 
Boolean deleteEscapeCharWhileDes,
+            @JsonProperty("auto_append_escape_char_after_deserialize") 
@Nullable Boolean autoAppendEscapeCharAfterDes) {
         super(streamId);
         this.streamId = (StringUtils.isEmpty(streamId) ? 
STREAM_ID_DEFAULT_VALUE : streamId);
         this.entrySplitter = entrySplitter;
         this.kvSplitter = kvSplitter;
         this.escapeChar = escapeChar;
+        this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+        this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
     }
 
     @JsonProperty("entry_splitter")
@@ -93,6 +119,24 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         return streamId;
     }
 
+    @JsonProperty("delete_escape_char_while_deserialize")
+    @Nullable
+    public Boolean getDeleteEscapeCharWhileDes() {
+        if (deleteEscapeCharWhileDes != null) {
+            return deleteEscapeCharWhileDes;
+        }
+        return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+    }
+
+    @JsonProperty("auto_append_escape_char_after_deserialize")
+    @Nullable
+    public Boolean getAutoAppendEscapeCharAfterDes() {
+        if (autoAppendEscapeCharAfterDes != null) {
+            return autoAppendEscapeCharAfterDes;
+        }
+        return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -106,6 +150,8 @@ public class KvDeserializationInfo extends 
InLongMsgDeserializationInfo {
         KvDeserializationInfo other = (KvDeserializationInfo) o;
         return Objects.equals(streamId, other.getStreamId()) && entrySplitter 
== other.entrySplitter
                 && kvSplitter == other.kvSplitter
-                && Objects.equals(escapeChar, other.escapeChar);
+                && Objects.equals(escapeChar, other.escapeChar)
+                && Objects.equals(deleteEscapeCharWhileDes, 
other.deleteEscapeCharWhileDes)
+                && Objects.equals(autoAppendEscapeCharAfterDes, 
other.autoAppendEscapeCharAfterDes);
     }
 }

Reply via email to