This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5f5e57186c [INLONG-11821][Sort] kv and csv deserialization
configuration support whether to remove and automatically add escape
configuration (#11822)
5f5e57186c is described below
commit 5f5e57186cf01f13cedb20c7aafa7a81b285948f
Author: Mingyu Bao <[email protected]>
AuthorDate: Wed Apr 16 10:00:17 2025 +0800
[INLONG-11821][Sort] kv and csv deserialization configuration support
whether to remove and automatically add escape configuration (#11822)
---
.../inlong/sort/protocol/constant/Constant.java | 4 ++
.../deserialization/CsvDeserializationInfo.java | 47 +++++++++++++++++++--
.../InLongMsgCsv2DeserializationInfo.java | 49 +++++++++++++++++++++-
.../InLongMsgCsvDeserializationInfo.java | 48 ++++++++++++++++++++-
.../InLongMsgKvDeserializationInfo.java | 49 +++++++++++++++++++++-
.../InLongMsgTlogCsvDeserializationInfo.java | 46 +++++++++++++++++++-
.../InLongMsgTlogKvDeserializationInfo.java | 49 +++++++++++++++++++++-
.../deserialization/KvDeserializationInfo.java | 48 ++++++++++++++++++++-
8 files changed, 329 insertions(+), 11 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
index b425cfea60..092c8fa201 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/Constant.java
@@ -40,4 +40,8 @@ public class Constant {
* The multiple table-pattern of sink
*/
public static final String SINK_MULTIPLE_TABLE_PATTERN =
"sink.multiple.table-pattern";
+
+ public static final Boolean DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT =
true;
+
+ public static final Boolean
AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT = false;
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index e43a2f129f..abc2310ccb 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -26,6 +26,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* Csv deserialization info
*/
@@ -41,27 +44,43 @@ public class CsvDeserializationInfo extends
InLongMsgDeserializationInfo {
private final String streamId;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
// TODO: support mapping index to field
public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter) {
- this(STREAM_ID_DEFAULT_VALUE, splitter, null);
+ this(STREAM_ID_DEFAULT_VALUE, splitter, null,
+ DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
}
public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- this(STREAM_ID_DEFAULT_VALUE, splitter, escapeChar);
+ this(STREAM_ID_DEFAULT_VALUE, splitter, escapeChar,
+ DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
}
@JsonCreator
public CsvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("splitter") char splitter,
- @JsonProperty("escape_char") @Nullable Character escapeChar) {
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.streamId = (StringUtils.isEmpty(streamId) ?
STREAM_ID_DEFAULT_VALUE : streamId);
this.splitter = splitter;
this.escapeChar = escapeChar;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("splitter")
@@ -80,6 +99,24 @@ public class CsvDeserializationInfo extends
InLongMsgDeserializationInfo {
return streamId;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -92,7 +129,9 @@ public class CsvDeserializationInfo extends
InLongMsgDeserializationInfo {
CsvDeserializationInfo other = (CsvDeserializationInfo) o;
return Objects.equals(streamId, other.getStreamId()) && splitter ==
other.splitter
- && Objects.equals(escapeChar, other.escapeChar);
+ && Objects.equals(escapeChar, other.escapeChar)
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index cb892c64fb..a1b8e7589c 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -27,6 +27,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* It represents CSV2 format of InLongMsg(m=9).
*/
@@ -41,10 +44,19 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
@Nullable
private final Character escapeChar;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
public InLongMsgCsv2DeserializationInfo(
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter) {
- this(streamId, delimiter, null);
+ this(streamId, delimiter, null,
DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
}
@JsonCreator
@@ -52,9 +64,22 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
+ this(streamId, delimiter, escapeChar,
DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+ }
+
+ @JsonCreator
+ public InLongMsgCsv2DeserializationInfo(
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("delimiter")
@@ -68,6 +93,24 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
return escapeChar;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -81,7 +124,9 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
InLongMsgCsv2DeserializationInfo other =
(InLongMsgCsv2DeserializationInfo) o;
return super.equals(other)
&& delimiter == other.delimiter
- && Objects.equals(escapeChar, other.escapeChar);
+ && Objects.equals(escapeChar, other.escapeChar)
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 197be48b47..341d8dfaae 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* It represents CSV format of InLongMsg(m=0).
*/
@@ -45,6 +48,14 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
@JsonInclude(Include.NON_NULL)
private final boolean deleteHeadDelimiter;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
public InLongMsgCsvDeserializationInfo(
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter) {
@@ -64,10 +75,25 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
+ this(streamId, delimiter, escapeChar, deleteHeadDelimiter,
+ DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+ }
+
+ @JsonCreator
+ public InLongMsgCsvDeserializationInfo(
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
this.deleteHeadDelimiter = deleteHeadDelimiter;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("delimiter")
@@ -86,6 +112,24 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
return deleteHeadDelimiter;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -100,6 +144,8 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
return super.equals(other)
&& delimiter == other.delimiter
&& Objects.equals(escapeChar, other.escapeChar)
- && deleteHeadDelimiter == other.deleteHeadDelimiter;
+ && deleteHeadDelimiter == other.deleteHeadDelimiter
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index de999da6ff..406045195b 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* It represents KV format of InLongMsg(m=5).
*/
@@ -48,6 +51,14 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
@Nullable
private final Character lineDelimiter;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
public InLongMsgKvDeserializationInfo(
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@@ -62,11 +73,27 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("line_delimiter") @Nullable Character lineDelimiter)
{
+ this(streamId, entryDelimiter, kvDelimiter, escapeChar, lineDelimiter,
+ DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+ }
+
+ @JsonCreator
+ public InLongMsgKvDeserializationInfo(
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
+ @JsonProperty("entry_delimiter") char entryDelimiter,
+ @JsonProperty("kv_delimiter") char kvDelimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("line_delimiter") @Nullable Character lineDelimiter,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
this.lineDelimiter = lineDelimiter == null ? '\n' : lineDelimiter;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("entry_delimiter")
@@ -91,6 +118,24 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
return lineDelimiter;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -106,6 +151,8 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
&& entryDelimiter == other.entryDelimiter
&& kvDelimiter == other.kvDelimiter
&& Objects.equals(escapeChar, other.escapeChar)
- && Objects.equals(lineDelimiter, other.lineDelimiter);
+ && Objects.equals(lineDelimiter, other.lineDelimiter)
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index ba935da023..10fe782336 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* It represents TLog CSV format of InLongMsg(m=10).
*/
@@ -42,6 +45,14 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
@Nullable
private final Character escapeChar;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter) {
@@ -53,9 +64,22 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
+ this(streamId, delimiter, escapeChar,
DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+ }
+
+ @JsonCreator
+ public InLongMsgTlogCsvDeserializationInfo(
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("delimiter")
@@ -69,6 +93,24 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
return escapeChar;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -82,7 +124,9 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
InLongMsgTlogCsvDeserializationInfo other =
(InLongMsgTlogCsvDeserializationInfo) o;
return super.equals(other)
&& delimiter == other.delimiter
- && Objects.equals(escapeChar, other.escapeChar);
+ && Objects.equals(escapeChar, other.escapeChar)
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index 6ec72fdd7a..7726818498 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* It represents TLog KV format of InLongMsg(m=15).
*/
@@ -46,6 +49,14 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
@Nullable
private final Character escapeChar;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
public InLongMsgTlogKvDeserializationInfo(
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
@JsonProperty("delimiter") char delimiter,
@@ -61,11 +72,27 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
+ this(streamId, delimiter, entryDelimiter, kvDelimiter, escapeChar,
+ DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+ }
+
+ @JsonCreator
+ public InLongMsgTlogKvDeserializationInfo(
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("entry_delimiter") char entryDelimiter,
+ @JsonProperty("kv_delimiter") char kvDelimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.delimiter = delimiter;
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("delimiter")
@@ -89,6 +116,24 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
return escapeChar;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -104,6 +149,8 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
&& delimiter == other.delimiter
&& entryDelimiter == other.entryDelimiter
&& kvDelimiter == other.kvDelimiter
- && Objects.equals(escapeChar, other.escapeChar);
+ && Objects.equals(escapeChar, other.escapeChar)
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index dd14eaebdd..ad22b35f53 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -28,6 +28,9 @@ import javax.annotation.Nullable;
import java.util.Objects;
+import static
org.apache.inlong.sort.protocol.constant.Constant.AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+import static
org.apache.inlong.sort.protocol.constant.Constant.DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+
/**
* Kv deserialization info
*/
@@ -46,6 +49,14 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
@Nullable
private final Character escapeChar;
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ private final Boolean deleteEscapeCharWhileDes;
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ private final Boolean autoAppendEscapeCharAfterDes;
+
public KvDeserializationInfo(
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter) {
@@ -65,11 +76,26 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
+ this(streamId, entrySplitter, kvSplitter, escapeChar,
+ DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT,
+ AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT);
+ }
+
+ @JsonCreator
+ public KvDeserializationInfo(
+ @JsonProperty("streamId") @JsonAlias(value = {"tid"}) String
streamId,
+ @JsonProperty("entry_splitter") char entrySplitter,
+ @JsonProperty("kv_splitter") char kvSplitter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("delete_escape_char_while_deserialize") @Nullable
Boolean deleteEscapeCharWhileDes,
+ @JsonProperty("auto_append_escape_char_after_deserialize")
@Nullable Boolean autoAppendEscapeCharAfterDes) {
super(streamId);
this.streamId = (StringUtils.isEmpty(streamId) ?
STREAM_ID_DEFAULT_VALUE : streamId);
this.entrySplitter = entrySplitter;
this.kvSplitter = kvSplitter;
this.escapeChar = escapeChar;
+ this.deleteEscapeCharWhileDes = deleteEscapeCharWhileDes;
+ this.autoAppendEscapeCharAfterDes = autoAppendEscapeCharAfterDes;
}
@JsonProperty("entry_splitter")
@@ -93,6 +119,24 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
return streamId;
}
+ @JsonProperty("delete_escape_char_while_deserialize")
+ @Nullable
+ public Boolean getDeleteEscapeCharWhileDes() {
+ if (deleteEscapeCharWhileDes != null) {
+ return deleteEscapeCharWhileDes;
+ }
+ return DELETE_ESCAPE_CHAR_WHILE_DESERIALIZE_DEFAULT;
+ }
+
+ @JsonProperty("auto_append_escape_char_after_deserialize")
+ @Nullable
+ public Boolean getAutoAppendEscapeCharAfterDes() {
+ if (autoAppendEscapeCharAfterDes != null) {
+ return autoAppendEscapeCharAfterDes;
+ }
+ return AUTO_APPEND_ESCAPE_CHAR_AFTER_DESERIALIZE_DEFAULT;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -106,6 +150,8 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
KvDeserializationInfo other = (KvDeserializationInfo) o;
return Objects.equals(streamId, other.getStreamId()) && entrySplitter
== other.entrySplitter
&& kvSplitter == other.kvSplitter
- && Objects.equals(escapeChar, other.escapeChar);
+ && Objects.equals(escapeChar, other.escapeChar)
+ && Objects.equals(deleteEscapeCharWhileDes,
other.deleteEscapeCharWhileDes)
+ && Objects.equals(autoAppendEscapeCharAfterDes,
other.autoAppendEscapeCharAfterDes);
}
}