This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec3b69e647 [INLONG-11819][Sort] KV/CSV format support keep escape, 
using line delimiter and call back when parse field has exception (#11820)
ec3b69e647 is described below

commit ec3b69e647e68234cf1b691d709afd33baa6e535
Author: Mingyu Bao <[email protected]>
AuthorDate: Tue Apr 15 14:07:03 2025 +0800

    [INLONG-11819][Sort] KV/CSV format support keep escape, using line 
delimiter and call back when parse field has exception (#11820)
---
 .../inlong/sort/formats/base/TableFormatUtils.java |   7 +-
 .../sort/formats/base/TextFormatBuilder.java       |   7 ++
 .../sort/formats/inlongmsg/FailureHandler.java     |  14 +++
 .../formats/inlongmsg/IgnoreFailureHandler.java    |   9 ++
 .../sort/formats/inlongmsg/NoOpFailureHandler.java |   9 ++
 .../inlong/sort/formats/util/StringUtils.java      |  30 ++++--
 .../sort/formats/common/StringUtilsTest.java       |  84 ++++++++++------
 .../sort/formats/base/TableFormatUtilsTest.java    |  28 +++---
 .../sort/formats/csv/CsvDeserializationSchema.java |   2 +-
 .../sort/formats/inlongmsg/row/InLongMsgUtils.java |   2 +
 .../InLongMsgBinlogFormatDeserializer.java         |   3 +-
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java      |   7 +-
 .../InLongMsgCsvFormatDeserializer.java            |  17 +++-
 .../InLongMsgCsvMixedFormatConverter.java          |   2 +-
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    |   9 +-
 .../InLongMsgCsvFormatDeserializerTest.java        |   6 ++
 .../inlongmsgkv/InLongMsgKvFormatDeserializer.java |   2 +-
 .../InLongMsgKvMixedFormatConverter.java           |   2 +-
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java |   8 +-
 .../InLongMsgKvFormatDeserializerTest.java         |   6 ++
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    |   6 +-
 .../InLongMsgTlogCsvFormatDeserializerTest.java    |   6 ++
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java      |   6 +-
 .../InLongMsgTlogKvFormatDeserializerTest.java     |   6 ++
 .../sort/formats/kv/KvDeserializationSchema.java   |   2 +-
 .../sort/formats/inlongmsg/InLongMsgUtils.java     |   5 +
 .../InLongMsgBinlogFormatDeserializer.java         |   3 +-
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java      |  20 ++--
 .../InLongMsgCsvFormatDeserializer.java            |  52 ++++++++--
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    |  13 ++-
 .../InLongMsgCsvFormatDeserializerTest.java        |  10 +-
 .../inlongmsgkv/InLongMsgKvFormatDeserializer.java |  18 +++-
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java |  15 ++-
 .../InLongMsgKvFormatDeserializerTest.java         |   9 +-
 .../InLongMsgTlogCsvFormatDeserializer.java        | 111 +++++++++++++++++++--
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    |  44 +++++---
 .../InLongMsgTlogCsvFormatDeserializerTest.java    |  15 ++-
 .../InLongMsgTlogKvFormatDeserializer.java         |  71 ++++++++++---
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java      |  28 ++++--
 .../InLongMsgTlogKvFormatDeserializerTest.java     |  38 ++++---
 .../sort/formats/base/TableFormatUtilsTest.java    |  28 +++---
 .../csv/CsvRowDataDeserializationSchema.java       |  27 ++++-
 .../formats/kv/KvRowDataDeserializationSchema.java |  53 ++++++++--
 .../apache/inlong/sort/formats/kv/KvUtilsTest.java | 100 ++++++++++++++++++-
 44 files changed, 748 insertions(+), 192 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index f68a0085f0..31e3cdaf09 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -58,6 +58,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.TimestampTypeInf
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.TypeInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -547,7 +548,8 @@ public class TableFormatUtils {
             String fieldName,
             FormatInfo fieldFormatInfo,
             String fieldText,
-            String nullLiteral) {
+            String nullLiteral,
+            FailureHandler failureHandler) throws Exception {
         checkState(fieldFormatInfo instanceof BasicFormatInfo);
 
         if (fieldText == null) {
@@ -573,6 +575,9 @@ public class TableFormatUtils {
         } catch (Exception e) {
             LOG.warn("Could not properly deserialize the " + "text "
                     + fieldText + " for field " + fieldName + ".", e);
+            if (failureHandler != null) {
+                failureHandler.onConvertingFieldFailure(fieldName, fieldText, 
fieldFormatInfo, e);
+            }
         }
         return null;
     }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
index b329c79149..12576d3e76 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.formats.base;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 
@@ -44,6 +45,7 @@ public abstract class TextFormatBuilder<T extends 
TextFormatBuilder> {
     protected Character quoteChar = DEFAULT_QUOTE_CHARACTER;
     protected String nullLiteral = DEFAULT_NULL_LITERAL;
     protected boolean ignoreErrors = DEFAULT_IGNORE_ERRORS;
+    protected FailureHandler failureHandler;
 
     protected TextFormatBuilder(RowFormatInfo rowFormatInfo) {
         this.rowFormatInfo = rowFormatInfo;
@@ -79,6 +81,11 @@ public abstract class TextFormatBuilder<T extends 
TextFormatBuilder> {
         return (T) this;
     }
 
+    public T setFailureHandler(FailureHandler failureHandler) {
+        this.failureHandler = failureHandler;
+        return (T) this;
+    }
+
     @SuppressWarnings("unchecked")
     public T configure(DescriptorProperties descriptorProperties) {
 
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
index 6bdbf36103..e07b9fc815 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+
 import java.io.Serializable;
 
 /**
@@ -71,4 +73,16 @@ public interface FailureHandler extends Serializable {
      */
     void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, 
Exception exception) throws Exception;
 
+    /**
+     * This method is called when there is a failure occurred while converting 
any field to row.
+     *
+     * @param fieldName the filed name
+     * @param fieldText the filed test
+     * @param formatInfo the filed target type info
+     * @param exception the thrown exception
+     * @throws Exception the exception
+     */
+    void onConvertingFieldFailure(String fieldName, String fieldText, 
FormatInfo formatInfo,
+            Exception exception) throws Exception;
+
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
index b27fefc891..2afa460def 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +49,13 @@ public class IgnoreFailureHandler implements FailureHandler {
         LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body, 
exception);
     }
 
+    @Override
+    public void onConvertingFieldFailure(String fieldName, String fieldText, 
FormatInfo formatInfo,
+            Exception exception) throws Exception {
+        LOG.warn("Cannot convert the InLongMsg Filed (fieldName = {}, 
formatInfo = {}, fieldText = {}),",
+                fieldName, formatInfo, fieldText, exception);
+    }
+
     @Override
     public boolean isIgnoreFailure() {
         return true;
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
index d049920be1..9c4c764c49 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +53,13 @@ public class NoOpFailureHandler implements FailureHandler {
         throw exception;
     }
 
+    @Override
+    public void onConvertingFieldFailure(String fieldName, String fieldText, 
FormatInfo formatInfo,
+            Exception exception) throws Exception {
+        LOG.warn("Cannot convert the InLongMsg Filed (fieldName = {}, 
formatInfo = {}, fieldText = {}),",
+                fieldName, formatInfo, fieldText, exception);
+    }
+
     @Override
     public boolean isIgnoreFailure() {
         return false;
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
index 000d7a7175..807bbe00c4 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
@@ -39,7 +39,7 @@ public class StringUtils {
     private static final int STATE_QUOTING = 16;
 
     /**
-     * @see StringUtils#splitKv(String, Character, Character, 
Character,Character, Character)
+     * @see StringUtils#splitKv(String, Character, Character, 
Character,Character)
      */
     public static Map<String, String> splitKv(
             @Nonnull String text,
@@ -48,7 +48,8 @@ public class StringUtils {
             @Nullable Character escapeChar,
             @Nullable Character quoteChar) {
         List<Map<String, String>> lines =
-                splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar, null);
+                splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar, null,
+                        true);
         if (lines.size() == 0) {
             return new HashMap<>();
         }
@@ -77,7 +78,8 @@ public class StringUtils {
             @Nonnull Character kvDelimiter,
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
-            @Nullable Character lineDelimiter) {
+            @Nullable Character lineDelimiter,
+            @Nullable boolean isDeleteEscapeChar) {
         Map<String, String> fields = new HashMap<>();
         List<Map<String, String>> lines = new ArrayList<>();
 
@@ -158,6 +160,9 @@ public class StringUtils {
                     case STATE_VALUE:
                         kvState = state;
                         state = STATE_ESCAPING;
+                        if (!isDeleteEscapeChar) {
+                            stringBuilder.append(ch);
+                        }
                         break;
                     case STATE_ESCAPING:
                         stringBuilder.append(ch);
@@ -369,7 +374,7 @@ public class StringUtils {
     /**
      * Splits a single line of csv text.
      *
-     * @see StringUtils#splitCsv(String, Character, Character, Character, 
Character, boolean)
+     * @see StringUtils#splitCsv(String, Character, Character, Character, 
Character)
      */
     public static String[] splitCsv(
             @Nonnull String text,
@@ -384,7 +389,7 @@ public class StringUtils {
     }
 
     /**
-     * @see StringUtils#splitCsv(String, Character, Character, Character, 
Character, boolean)
+     * @see StringUtils#splitCsv(String, Character, Character, Character, 
Character)
      */
     public static String[][] splitCsv(
             @Nonnull String text,
@@ -392,11 +397,12 @@ public class StringUtils {
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
             @Nullable Character lineDelimiter) {
-        return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, 
false);
+        return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
+                false, true);
     }
 
     /**
-     * @see StringUtils#splitCsv(String, Character, Character, Character, 
Character, boolean, Integer)
+     * @see StringUtils#splitCsv(String, Character, Character, Character, 
Character, boolean, boolean)
      */
     public static String[][] splitCsv(
             @Nonnull String text,
@@ -404,8 +410,10 @@ public class StringUtils {
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
             @Nullable Character lineDelimiter,
-            boolean deleteHeadDelimiter) {
-        return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, 
deleteHeadDelimiter, null);
+            boolean deleteHeadDelimiter,
+            boolean isDeleteEscapeChar) {
+        return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
+                deleteHeadDelimiter, isDeleteEscapeChar, null);
     }
 
     /**
@@ -434,6 +442,7 @@ public class StringUtils {
             @Nullable Character quoteChar,
             @Nullable Character lineDelimiter,
             boolean deleteHeadDelimiter,
+            boolean isDeleteEscapeChar,
             @Nullable Integer maxFieldSize) {
         if (maxFieldSize != null && maxFieldSize <= 0) {
             return new String[0][];
@@ -481,6 +490,9 @@ public class StringUtils {
                 switch (state) {
                     case STATE_NORMAL:
                         state = STATE_ESCAPING;
+                        if (!isDeleteEscapeChar) {
+                            stringBuilder.append(ch);
+                        }
                         break;
                     case STATE_ESCAPING:
                         stringBuilder.append(ch);
diff --git 
a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
 
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
index b9c88ed788..5da3dfab7c 100644
--- 
a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
+++ 
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
@@ -34,27 +34,27 @@ public class StringUtilsTest {
     public void testSplitKvString() {
 
         String kvString1 = "name=n&age=10";
-        Map<String, String> map1 = StringUtils.splitKv(kvString1, '&',
-                '=', '\\', '\'');
-        assertEquals("n", map1.get("name"));
-        assertEquals("10", map1.get("age"));
+        List<Map<String, String>> listMap1 = StringUtils.splitKv(kvString1, 
'&',
+                '=', '\\', '\'', '\n', true);
+        assertEquals("n", listMap1.get(0).get("name"));
+        assertEquals("10", listMap1.get(0).get("age"));
 
         String kvString2 = "name=&age=20&";
-        Map<String, String> map2 = StringUtils.splitKv(kvString2, '&',
-                '=', '\\', '\'');
-        assertEquals("", map2.get("name"));
-        assertEquals("20&", map2.get("age"));
+        List<Map<String, String>> listMap2 = StringUtils.splitKv(kvString2, 
'&',
+                '=', '\\', '\'', '\n', true);
+        assertEquals("", listMap2.get(0).get("name"));
+        assertEquals("20&", listMap2.get(0).get("age"));
 
         String kvString3 = "name==&age=20&&&value=aaa&dddd&";
-        Map<String, String> map3 = StringUtils.splitKv(kvString3, '&',
-                '=', '\\', '\'');
-        assertEquals("=", map3.get("name"));
-        assertEquals("20&&", map3.get("age"));
-        assertEquals("aaa&dddd&", map3.get("value"));
+        List<Map<String, String>> listMap3 = StringUtils.splitKv(kvString3, 
'&',
+                '=', '\\', '\'', '\n', true);
+        assertEquals("=", listMap3.get(0).get("name"));
+        assertEquals("20&&", listMap3.get(0).get("age"));
+        assertEquals("aaa&dddd&", listMap3.get(0).get("value"));
 
         String kvString4 = "name==&age=20&&\nname1==&age1=20&&";
         List<Map<String, String>> map4 = StringUtils.splitKv(kvString4, '&',
-                '=', '\\', '\'', '\n');
+                '=', '\\', '\'', '\n', true);
         assertEquals("=", map4.get(0).get("name"));
         assertEquals("20&&", map4.get(0).get("age"));
         assertEquals("=", map4.get(1).get("name1"));
@@ -62,7 +62,7 @@ public class StringUtilsTest {
 
         String kvString5 = 
"name==&age=20&&\nname1==&age1=20&&&value=aaa&dddd&";
         List<Map<String, String>> map5 = StringUtils.splitKv(kvString5, '&',
-                '=', '\\', '\'', '\n');
+                '=', '\\', '\'', '\n', true);
         assertEquals("=", map5.get(0).get("name"));
         assertEquals("20&&", map5.get(0).get("age"));
         assertEquals("=", map5.get(1).get("name1"));
@@ -71,25 +71,25 @@ public class StringUtilsTest {
 
         String kvString6 = "name==&age=20&&\\";
         List<Map<String, String>> map6 = StringUtils.splitKv(kvString6, '&',
-                '=', '\\', '\'', '\n');
+                '=', '\\', '\'', '\n', true);
         assertEquals("=", map6.get(0).get("name"));
         assertEquals("20&&", map6.get(0).get("age"));
 
         String kvString7 = "name==&age=20&&'";
         List<Map<String, String>> map7 = StringUtils.splitKv(kvString7, '&',
-                '=', '\\', '\'', '\n');
+                '=', '\\', '\'', '\n', true);
         assertEquals("=", map7.get(0).get("name"));
         assertEquals("20&&", map7.get(0).get("age"));
 
         String kvString8 = "name=\\=&age=20&a&'";
         List<Map<String, String>> map8 = StringUtils.splitKv(kvString8, '&',
-                '=', '\\', '\'', '\n');
+                '=', '\\', '\'', '\n', true);
         assertEquals("=", map8.get(0).get("name"));
         assertEquals("20&a&", map8.get(0).get("age"));
 
         String kvString9 = "name=\\=&age=20&a\\&'";
         List<Map<String, String>> map9 = StringUtils.splitKv(kvString9, '&',
-                '=', '\\', '\'', '\n');
+                '=', '\\', '\'', '\n', true);
         assertEquals("=", map8.get(0).get("name"));
         assertEquals("20&a&", map8.get(0).get("age"));
     }
@@ -115,22 +115,46 @@ public class StringUtilsTest {
         assertEquals("home", csv1Array2[2][2]);
     }
 
+    @Test
+    public void testSplitCsvStringWhiteEscape() {
+        String csvString1 = "name|age=20\\||&'";
+        String[][] csv1Array1 = StringUtils.splitCsv(csvString1, '|',
+                '\\', '\'', '\n', false, false);
+
+        assertEquals("age=20\\|", csv1Array1[0][1]);
+        assertEquals("&", csv1Array1[0][2]);
+
+        String csvString2 = 
"name|age=20\\||&'\n\name|age=20\\||&'\n\n|home|\\home\\";
+        String[][] csv1Array2 = StringUtils.splitCsv(csvString2, '|',
+                '\\', '\'', '\n', false, false);
+
+        assertEquals("name", csv1Array2[0][0]);
+        assertEquals("age=20\\|", csv1Array2[0][1]);
+        assertEquals("&\n\name|age=20\\||&", csv1Array2[0][2]);
+        assertEquals("", csv1Array2[2][0]);
+        assertEquals("home", csv1Array2[2][1]);
+        assertEquals("\\home\\", csv1Array2[2][2]);
+    }
+
     @Test
     public void testSplitCsvStringWithMaxFields() {
 
         String csvString = 
"name|age=20\\||&'\n\name|age=20\\||&'\n\n|home|\\home\\";
         String[][] csv1Array0 = StringUtils.splitCsv(csvString, '|',
-                '\\', '\'', '\n', false, 0);
+                '\\', '\'', '\n', false, true,
+                0);
         assertEquals(0, csv1Array0.length);
 
         String[][] csv1Array1 = StringUtils.splitCsv(csvString, '|',
-                '\\', '\'', '\n', false, 1);
+                '\\', '\'', '\n', false, true,
+                1);
         assertEquals("name|age=20\\||&'\n\name|age=20\\||&'", 
csv1Array1[0][0]);
         assertEquals("", csv1Array1[1][0]);
         assertEquals("|home|\\home\\", csv1Array1[2][0]);
 
         String[][] csv1Array2 = StringUtils.splitCsv(csvString, '|',
-                '\\', '\'', '\n', false, 2);
+                '\\', '\'', '\n', false, true,
+                2);
         assertEquals("name", csv1Array2[0][0]);
         assertEquals("age=20\\||&'\n\name|age=20\\||&'", csv1Array2[0][1]);
         assertEquals("", csv1Array2[1][0]);
@@ -138,7 +162,8 @@ public class StringUtilsTest {
         assertEquals("home|\\home\\", csv1Array2[2][1]);
 
         String[][] csv1Array3 = StringUtils.splitCsv(csvString, '|',
-                '\\', '\'', '\n', false, 3);
+                '\\', '\'', '\n', false, true,
+                3);
         assertEquals("name", csv1Array3[0][0]);
         assertEquals("age=20|", csv1Array3[0][1]);
         assertEquals("&\n\name|age=20\\||&", csv1Array3[0][2]);
@@ -147,7 +172,8 @@ public class StringUtilsTest {
         assertEquals("home", csv1Array3[2][2]);
 
         String[][] csv1Array4 = StringUtils.splitCsv(csvString, '|',
-                '\\', '\'', '\n', false, 4);
+                '\\', '\'', '\n', false, true,
+                4);
         assertEquals("name", csv1Array4[0][0]);
         assertEquals("age=20|", csv1Array4[0][1]);
         assertEquals("&\n\name|age=20\\||&", csv1Array4[0][2]);
@@ -159,9 +185,11 @@ public class StringUtilsTest {
     @Test
     public void testKvScapeCharSplit() {
         String text = "k1=v1&\nk\\2=v2\\&&k3=v3";
-        Map<String, String> kvMap = splitKv(text, '&', '=', '\\', null);
-        Assert.assertTrue(kvMap != null && kvMap.size() == 3);
-        Assert.assertTrue(kvMap.get("k3") != null);
-        Assert.assertTrue(kvMap.get("\nk2") != null);
+        List<Map<String, String>> kvMapList = splitKv(text, '&', '=', '\\',
+                null, '\n', false);
+        Assert.assertTrue(kvMapList != null && kvMapList.size() == 2);
+        Assert.assertTrue(kvMapList.get(0).get("k3") == null);
+        Assert.assertTrue(kvMapList.get(1).get("\nk2") == null);
+        Assert.assertTrue(kvMapList.get(1).get("k\\2") != null);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
 
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 1487cea9e2..c47a267e59 100644
--- 
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -34,13 +34,13 @@ import static org.junit.Assert.assertNull;
 public class TableFormatUtilsTest {
 
     @Test
-    public void testDeserializeStringWithoutNullLiteral() {
+    public void testDeserializeStringWithoutNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         StringFormatInfo.INSTANCE,
                         "data",
-                        null);
+                        null, null);
         assertEquals("data", result1);
 
         Object result2 =
@@ -48,7 +48,7 @@ public class TableFormatUtilsTest {
                         "f",
                         StringFormatInfo.INSTANCE,
                         "",
-                        null);
+                        null, null);
         assertEquals("", result2);
     }
 
@@ -72,13 +72,13 @@ public class TableFormatUtilsTest {
     }
 
     @Test
-    public void testDeserializeStringWithNullLiteral() {
+    public void testDeserializeStringWithNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         StringFormatInfo.INSTANCE,
                         "data",
-                        "n/a");
+                        "n/a", null);
         assertEquals("data", result1);
 
         Object result2 =
@@ -86,7 +86,7 @@ public class TableFormatUtilsTest {
                         "f",
                         StringFormatInfo.INSTANCE,
                         "",
-                        "n/a");
+                        "n/a", null);
         assertEquals("", result2);
 
         Object result3 =
@@ -94,7 +94,7 @@ public class TableFormatUtilsTest {
                         "f",
                         StringFormatInfo.INSTANCE,
                         "n/a",
-                        "n/a");
+                        "n/a", null);
         assertNull(result3);
     }
 
@@ -126,13 +126,13 @@ public class TableFormatUtilsTest {
     }
 
     @Test
-    public void testDeserializeNumberWithoutNullLiteral() {
+    public void testDeserializeNumberWithoutNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         IntFormatInfo.INSTANCE,
                         "1",
-                        null);
+                        null, null);
         assertEquals(1, result1);
 
         Object result2 =
@@ -140,7 +140,7 @@ public class TableFormatUtilsTest {
                         "f",
                         IntFormatInfo.INSTANCE,
                         "",
-                        null);
+                        null, null);
         assertNull(result2);
     }
 
@@ -164,13 +164,13 @@ public class TableFormatUtilsTest {
     }
 
     @Test
-    public void testDeserializeNumberWithNullLiteral() {
+    public void testDeserializeNumberWithNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         IntFormatInfo.INSTANCE,
                         "1",
-                        "n/a");
+                        "n/a", null);
         assertEquals(1, result1);
 
         try {
@@ -178,7 +178,7 @@ public class TableFormatUtilsTest {
                     "f",
                     IntFormatInfo.INSTANCE,
                     "",
-                    "n/a");
+                    "n/a", null);
             Assert.assertEquals(null, result2);
         } catch (Exception e) {
             // ignored
@@ -189,7 +189,7 @@ public class TableFormatUtilsTest {
                         "f",
                         IntFormatInfo.INSTANCE,
                         "n/a",
-                        "n/a");
+                        "n/a", null);
         assertNull(result3);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
index 39bfb5f7a2..2fb51da8ca 100644
--- 
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
@@ -160,7 +160,7 @@ public final class CsvDeserializationSchema extends 
DefaultDeserializationSchema
                                     fieldNames[i],
                                     fieldFormatInfos[i],
                                     fieldTexts[i],
-                                    nullLiteral);
+                                    nullLiteral, null);
 
                     row.setField(i, field);
                 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
index 00391b2911..ab8377d14f 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
@@ -79,10 +79,12 @@ public class InLongMsgUtils {
     public static final String FORMAT_TIME_FIELD_NAME = 
"format.time-field-name";
     public static final String FORMAT_ATTRIBUTES_FIELD_NAME = 
"format.attributes-field-name";
     public static final String FORMAT_RETAIN_PREDEFINED_FIELD = 
"format.retain-predefined-field";
+    public static final String FORMAT_APPEND_ESCAPE_FIELD = 
"format.append-escape";
 
     public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
     public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = 
"inlongmsg_attributes";
     public static final boolean DEFAULT_PREDEFINED_FIELD = true;
+    public static final boolean DEFAULT_APPEND_ESCAPE = false;
 
     public static final TypeInformation<Row> MIXED_ROW_TYPE =
             Types.ROW_NAMED(
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 5128d51be5..9cb52d3d07 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -30,7 +30,6 @@ import org.apache.flink.types.Row;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -152,7 +151,7 @@ public final class InLongMsgBinlogFormatDeserializer 
extends AbstractInLongMsgFo
     }
 
     @Override
-    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws IOException {
+    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
         return InLongMsgBinlogUtils.getRows(
                 rowFormatInfo,
                 timeFieldName,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 59dbd84667..4eb3a042f6 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.types.Row;
 
-import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -186,7 +185,7 @@ public class InLongMsgBinlogUtils {
             String metadataFieldName,
             Map<String, String> attributes,
             byte[] bytes,
-            boolean includeUpdateBefore) throws IOException {
+            boolean includeUpdateBefore) throws Exception {
 
         InLongBinlog.RowData rowData = InLongBinlog.RowData.parseFrom(bytes);
 
@@ -256,7 +255,7 @@ public class InLongMsgBinlogUtils {
             Map<String, String> attributes,
             InLongBinlog.RowData rowData,
             String operation,
-            List<InLongBinlog.Column> columns) {
+            List<InLongBinlog.Column> columns) throws Exception {
         List<Object> headFields = new ArrayList<>();
 
         if (timeFieldName != null) {
@@ -311,7 +310,7 @@ public class InLongMsgBinlogUtils {
                                 fieldName,
                                 dataFieldFormatInfos[i],
                                 fieldText,
-                                null);
+                                null, null);
                 row.setField(i + headFields.size(), field);
             }
         }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index ab6a748056..532e71931c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -40,9 +40,11 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_D
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_APPEND_ESCAPE;
 import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
 import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_APPEND_ESCAPE_FIELD;
 import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
 import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
@@ -120,6 +122,11 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
      */
     private boolean retainPredefinedField = true;
 
+    /**
+     * True if the append configed escape char, default false.
+     */
+    private boolean appendEscapeChar = false;
+
     public InLongMsgCsvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nullable String timeFieldName,
@@ -223,7 +230,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
     }
 
     @Override
-    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
+    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
         Row dataRow =
                 InLongMsgCsvUtils.deserializeRow(
                         rowFormatInfo,
@@ -252,6 +259,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
         private Boolean deleteHeadDelimiter = DEFAULT_DELETE_HEAD_DELIMITER;
         private Boolean retainPredefinedField = DEFAULT_PREDEFINED_FIELD;
+        private Boolean appendEscapeChar = DEFAULT_APPEND_ESCAPE;
 
         public Builder(RowFormatInfo rowFormatInfo) {
             super(rowFormatInfo);
@@ -287,6 +295,11 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
             return this;
         }
 
+        public Builder setAppendEscapeChar(Boolean appendEscapeChar) {
+            this.appendEscapeChar = appendEscapeChar;
+            return this;
+        }
+
         @Override
         public Builder configure(DescriptorProperties descriptorProperties) {
             super.configure(descriptorProperties);
@@ -303,6 +316,8 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                     .ifPresent(this::setDeleteHeadDelimiter);
             
descriptorProperties.getOptionalBoolean(FORMAT_RETAIN_PREDEFINED_FIELD)
                     .ifPresent(this::setRetainPredefinedField);
+            descriptorProperties.getOptionalBoolean(FORMAT_APPEND_ESCAPE_FIELD)
+                    .ifPresent(this::setAppendEscapeChar);
             return this;
         }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index 6b4400f548..7b2e30e6f1 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -96,7 +96,7 @@ public class InLongMsgCsvMixedFormatConverter extends 
AbstractInLongMsgMixedForm
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
-            Map<String, String> entries) {
+            Map<String, String> entries) throws Exception {
         Row dataRow =
                 InLongMsgCsvUtils.deserializeRow(
                         rowFormatInfo,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index c347f5861b..e4e4e5659e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -108,7 +108,8 @@ public class InLongMsgCsvUtils {
         String bodyStr = new String(bytes, Charset.forName(charset));
 
         String[][] split =
-                splitCsv(bodyStr, delimiter, escapeChar, quoteChar, 
lineDelimiter, deleteHeadDelimiter);
+                splitCsv(bodyStr, delimiter, escapeChar, quoteChar, 
lineDelimiter, deleteHeadDelimiter,
+                        true);
 
         return Arrays.stream(split)
                 .map((line) -> {
@@ -135,7 +136,7 @@ public class InLongMsgCsvUtils {
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
-            List<String> fields) {
+            List<String> fields) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -164,7 +165,7 @@ public class InLongMsgCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i, field);
         }
 
@@ -184,7 +185,7 @@ public class InLongMsgCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i + predefinedFields.size(), field);
         }
         for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 8508aaa90e..b915519f66 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -744,5 +744,11 @@ public class InLongMsgCsvFormatDeserializerTest {
                 InLongMsgBody body, Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index 1fecb43d5f..b880a4817a 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -225,7 +225,7 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
     }
 
     @Override
-    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
+    protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) 
throws Exception {
         Row dataRow =
                 InLongMsgKvUtils.deserializeRow(
                         rowFormatInfo,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index a70ff5a1a3..ffdba490bd 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -95,7 +95,7 @@ public class InLongMsgKvMixedFormatConverter extends 
AbstractInLongMsgMixedForma
             Timestamp time,
             List<String> predefinedFields,
             List<String> fields,
-            Map<String, String> entries) {
+            Map<String, String> entries) throws Exception {
         Row dataRow =
                 InLongMsgKvUtils.deserializeRow(
                         rowFormatInfo,
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index faeb11fa18..d8414ca038 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -101,7 +101,7 @@ public class InLongMsgKvUtils {
                         kvDelimiter,
                         escapeChar,
                         quoteChar,
-                        lineDelimiter);
+                        lineDelimiter, true);
 
         return list.stream().map((line) -> {
             return new InLongMsgBody(
@@ -125,7 +125,7 @@ public class InLongMsgKvUtils {
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
-            Map<String, String> entries) {
+            Map<String, String> entries) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -147,7 +147,7 @@ public class InLongMsgKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i, field);
         }
 
@@ -163,7 +163,7 @@ public class InLongMsgKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i, field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index 045b423f94..1c25f93d82 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -329,5 +329,11 @@ public class InLongMsgKvFormatDeserializerTest {
                 Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index d9a699b174..82bc8e2ebc 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -110,7 +110,7 @@ public class InLongMsgTlogCsvUtils {
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
-            List<String> fields) {
+            List<String> fields) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -138,7 +138,7 @@ public class InLongMsgTlogCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i, field);
         }
 
@@ -158,7 +158,7 @@ public class InLongMsgTlogCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i + predefinedFields.size(), field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 7d66b9ba1f..002ddfe542 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -285,5 +285,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
                 InLongMsgBody body, Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index b2cecd8248..6982b5bea0 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -110,7 +110,7 @@ public class InLongMsgTlogKvUtils {
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
-            Map<String, String> entries) {
+            Map<String, String> entries) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -132,7 +132,7 @@ public class InLongMsgTlogKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i, field);
         }
 
@@ -147,7 +147,7 @@ public class InLongMsgTlogKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral);
+                            nullLiteral, null);
             row.setField(i, field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index 598d7bd5dd..d70d6f18ba 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -209,5 +209,11 @@ public class InLongMsgTlogKvFormatDeserializerTest {
                 InLongMsgBody body, Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
index 0e3f35180e..eb3029db71 100644
--- 
a/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
@@ -162,7 +162,7 @@ public final class KvDeserializationSchema extends 
DefaultDeserializationSchema<
                                 fieldName,
                                 fieldFormatInfo,
                                 fieldText,
-                                nullLiteral);
+                                nullLiteral, null);
                 row.setField(i, field);
             }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index f48c1c7a0f..f30d916341 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -76,6 +76,11 @@ public class InLongMsgUtils {
     public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
     public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = 
"inlongmsg_attributes";
 
+    public static final boolean DEFAULT_IS_RETAIN_PREDEFINED_FIELD = false;
+    public static final boolean DEFAULT_IS_DELETE_ESCAPE_CHAR = true;
+    public static final boolean DEFAULT_IS_PATCH_ESCAPE_CHAR = false;
+    public static final boolean DEFAULT_IS_DELETE_HEAD_DELIMITER = false;
+
     private static final FieldToRowDataConverters.FieldToRowDataConverter 
TIME_FIELD_CONVERTER =
             FieldToRowDataConverters.createConverter(new TimestampType());
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 53a0ba64f4..fca7ac1cdc 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -150,7 +150,8 @@ public final class InLongMsgBinlogFormatDeserializer 
extends AbstractInLongMsgFo
                 metadataFieldName,
                 head.getAttributes(),
                 body.getData(),
-                includeUpdateBefore);
+                includeUpdateBefore,
+                failureHandler);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 16ed2a7056..53c00d0daa 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.binlog.InLongBinlog;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
@@ -34,7 +35,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarCharType;
 
-import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -145,7 +145,7 @@ public class InLongMsgBinlogUtils {
             String metadataFieldName,
             Map<String, String> attributes,
             byte[] bytes,
-            boolean includeUpdateBefore) throws IOException {
+            boolean includeUpdateBefore, FailureHandler failureHandler) throws 
Exception {
 
         InLongBinlog.RowData rowData = InLongBinlog.RowData.parseFrom(bytes);
 
@@ -162,7 +162,8 @@ public class InLongMsgBinlogUtils {
                                 attributes,
                                 rowData,
                                 DBSYNC_OPERATION_INERT,
-                                rowData.getAfterColumnsList()));
+                                rowData.getAfterColumnsList(),
+                                failureHandler));
                 break;
             case UPDATE:
                 if (includeUpdateBefore) {
@@ -175,7 +176,8 @@ public class InLongMsgBinlogUtils {
                                     attributes,
                                     rowData,
                                     DBSYNC_OPERATION_UPDATE_BEFORE,
-                                    rowData.getBeforeColumnsList()));
+                                    rowData.getBeforeColumnsList(),
+                                    failureHandler));
                 }
                 rows.add(
                         constructRowData(
@@ -186,7 +188,8 @@ public class InLongMsgBinlogUtils {
                                 attributes,
                                 rowData,
                                 DBSYNC_OPERATION_UPDATE,
-                                rowData.getAfterColumnsList()));
+                                rowData.getAfterColumnsList(),
+                                failureHandler));
                 break;
             case DELETE:
                 rows.add(
@@ -198,7 +201,8 @@ public class InLongMsgBinlogUtils {
                                 attributes,
                                 rowData,
                                 DBSYNC_OPERATION_DELETE,
-                                rowData.getBeforeColumnsList()));
+                                rowData.getBeforeColumnsList(),
+                                failureHandler));
                 break;
             default:
                 return null;
@@ -215,7 +219,7 @@ public class InLongMsgBinlogUtils {
             Map<String, String> attributes,
             InLongBinlog.RowData rowData,
             String operation,
-            List<InLongBinlog.Column> columns) {
+            List<InLongBinlog.Column> columns, FailureHandler failureHandler) 
throws Exception {
         List<Object> headFields = new ArrayList<>();
 
         if (timeFieldName != null) {
@@ -270,7 +274,7 @@ public class InLongMsgBinlogUtils {
                                 fieldName,
                                 dataFieldFormatInfos[i],
                                 fieldText,
-                                null);
+                                null, failureHandler);
                 row.setField(i + headFields.size(), field);
             }
         }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index 3e9f0843f5..aac79533ef 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -44,6 +44,8 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_D
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_RETAIN_PREDEFINED_FIELD;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
 
@@ -122,6 +124,11 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
      */
     private boolean retainPredefinedField = true;
 
+    /**
+     * True if delete escape char while parsing.
+     */
+    private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
     public InLongMsgCsvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nullable String timeFieldName,
@@ -148,8 +155,9 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                 nullLiteral,
                 deleteHeadDelimiter,
                 metadataKeys,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
+                retainPredefinedField,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
-        this.retainPredefinedField = retainPredefinedField;
     }
 
     public InLongMsgCsvFormatDeserializer(
@@ -177,6 +185,8 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                 nullLiteral,
                 deleteHeadDelimiter,
                 metadataKeys,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
+                DEFAULT_IS_RETAIN_PREDEFINED_FIELD,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
     }
 
@@ -192,6 +202,8 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
             @Nullable String nullLiteral,
             boolean deleteHeadDelimiter,
             List<String> metadataKeys,
+            boolean isDeleteEscapeChar,
+            boolean retainPredefinedField,
             @Nonnull FailureHandler failureHandler) {
         super(failureHandler);
 
@@ -206,6 +218,8 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         this.nullLiteral = nullLiteral;
         this.deleteHeadDelimiter = deleteHeadDelimiter;
         this.metadataKeys = metadataKeys;
+        this.isDeleteEscapeChar = isDeleteEscapeChar;
+        this.retainPredefinedField = retainPredefinedField;
 
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -236,17 +250,18 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                 lineDelimiter,
                 escapeChar,
                 quoteChar,
-                deleteHeadDelimiter);
+                deleteHeadDelimiter,
+                isDeleteEscapeChar);
     }
 
     @Override
-    protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) {
+    protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
         GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
                 rowFormatInfo,
                 nullLiteral,
                 retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
                 body.getFields(),
-                converters);
+                converters, failureHandler);
 
         // Decorate result with time and attributes fields if needed
         genericRowData = InLongMsgUtils.decorateRowDataWithNeededHeadFields(
@@ -272,6 +287,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         private Boolean deleteHeadDelimiter = DEFAULT_DELETE_HEAD_DELIMITER;
         private List<String> metadataKeys = Collections.emptyList();
         private Boolean retainPredefinedField = 
DEFAULT_RETAIN_PREDEFINED_FIELD;
+        private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
 
         public Builder(RowFormatInfo rowFormatInfo) {
             super(rowFormatInfo);
@@ -312,7 +328,28 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
             return this;
         }
 
+        public Builder setDeleteEscapeChar(Boolean isDeleteEscapeChar) {
+            this.isDeleteEscapeChar = isDeleteEscapeChar;
+            return this;
+        }
+
         public InLongMsgCsvFormatDeserializer build() {
+            if (failureHandler != null) {
+                return new InLongMsgCsvFormatDeserializer(
+                        rowFormatInfo,
+                        timeFieldName,
+                        attributesFieldName,
+                        charset,
+                        delimiter,
+                        lineDelimiter,
+                        escapeChar,
+                        quoteChar,
+                        nullLiteral,
+                        deleteHeadDelimiter,
+                        metadataKeys,
+                        isDeleteEscapeChar,
+                        retainPredefinedField, failureHandler);
+            }
             return new InLongMsgCsvFormatDeserializer(
                     rowFormatInfo,
                     timeFieldName,
@@ -327,6 +364,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                     metadataKeys,
                     ignoreErrors,
                     retainPredefinedField);
+
         }
     }
 
@@ -355,7 +393,8 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                 Objects.equals(escapeChar, that.escapeChar) &&
                 Objects.equals(quoteChar, that.quoteChar) &&
                 Objects.equals(nullLiteral, that.nullLiteral) &&
-                Objects.equals(metadataKeys, that.metadataKeys);
+                Objects.equals(metadataKeys, that.metadataKeys) &&
+                Objects.equals(isDeleteEscapeChar, that.isDeleteEscapeChar);
 
     }
 
@@ -363,7 +402,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
     public int hashCode() {
         return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
                 attributesFieldName, charset, delimiter, lineDelimiter, 
escapeChar, quoteChar,
-                nullLiteral, deleteHeadDelimiter, metadataKeys);
+                nullLiteral, deleteHeadDelimiter, metadataKeys, 
isDeleteEscapeChar);
     }
 
     @Override
@@ -380,6 +419,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
                 ", nullLiteral='" + nullLiteral + '\'' +
                 ", deleteHeadDelimiter=" + deleteHeadDelimiter +
                 ", metadataKeys=" + metadataKeys +
+                ", isDeleteEscapeChar=" + isDeleteEscapeChar +
                 '}';
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index a7f0fd1905..99a44d33c0 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
@@ -109,11 +110,12 @@ public class InLongMsgCsvUtils {
             Character lineDelimiter,
             Character escapeChar,
             Character quoteChar,
-            boolean deleteHeadDelimiter) {
+            boolean deleteHeadDelimiter, boolean isDeleteEscapeChar) {
         String bodyStr = new String(bytes, Charset.forName(charset));
 
         String[][] split =
-                splitCsv(bodyStr, delimiter, escapeChar, quoteChar, 
lineDelimiter, deleteHeadDelimiter);
+                splitCsv(bodyStr, delimiter, escapeChar, quoteChar, 
lineDelimiter,
+                        deleteHeadDelimiter, isDeleteEscapeChar);
 
         return Arrays.stream(split)
                 .map((line) -> {
@@ -132,7 +134,8 @@ public class InLongMsgCsvUtils {
             String nullLiteral,
             List<String> predefinedFields,
             List<String> fields,
-            FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+            FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -160,7 +163,7 @@ public class InLongMsgCsvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral));
+                    nullLiteral, failureHandler));
             rowData.setField(i, field);
         }
 
@@ -180,7 +183,7 @@ public class InLongMsgCsvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral));
+                    nullLiteral, failureHandler));
             rowData.setField(i + predefinedFields.size(), field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index fb8e00ab41..e7aef2c6b3 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -278,6 +278,8 @@ public class InLongMsgCsvFormatDeserializerTest {
                         null,
                         InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
                         Collections.emptyList(),
+                        true,
+                        true,
                         errorHandler);
 
         InLongMsg inLongMsg = InLongMsg.newInLongMsg();
@@ -290,7 +292,7 @@ public class InLongMsgCsvFormatDeserializerTest {
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg.buildArray(), collector);
-        assertEquals(0, errorHandler.getRowCount());
+        assertEquals(1, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = 
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -874,5 +876,11 @@ public class InLongMsgCsvFormatDeserializerTest {
                 Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index b3dff0dec8..a9caf3bac5 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -44,6 +44,7 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_E
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 
 /**
@@ -120,6 +121,11 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
      */
     private boolean retainPredefinedField = true;
 
+    /**
+     * True if delete escape char while parsing.
+     */
+    private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
     public InLongMsgKvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nullable String timeFieldName,
@@ -144,6 +150,7 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
                 escapeChar,
                 quoteChar,
                 nullLiteral,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
         if (retainPredefinedField != null) {
             this.retainPredefinedField = retainPredefinedField;
@@ -173,6 +180,7 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
                 escapeChar,
                 quoteChar,
                 nullLiteral,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
     }
 
@@ -187,6 +195,7 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
             @Nullable String nullLiteral,
+            @Nullable Boolean isDeleteEscapeChar,
             @Nonnull FailureHandler failureHandler) {
         super(failureHandler);
 
@@ -200,6 +209,7 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
         this.escapeChar = escapeChar;
         this.quoteChar = quoteChar;
         this.nullLiteral = nullLiteral;
+        this.isDeleteEscapeChar = isDeleteEscapeChar;
 
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -231,17 +241,19 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
                 kvDelimiter,
                 lineDelimiter,
                 escapeChar,
-                quoteChar);
+                quoteChar,
+                isDeleteEscapeChar);
     }
 
     @Override
-    protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) {
+    protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
         GenericRowData genericRowData = InLongMsgKvUtils.deserializeRowData(
                 rowFormatInfo,
                 nullLiteral,
                 retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
                 body.getEntries(),
-                converters);
+                converters,
+                failureHandler);
 
         // Decorate result with time and attributes fields if needed
         return 
Collections.singletonList(InLongMsgUtils.decorateRowDataWithNeededHeadFields(
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 8bcf77c1ec..703a064384 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
@@ -91,7 +92,8 @@ public class InLongMsgKvUtils {
             char kvDelimiter,
             Character lineDelimiter,
             Character escapeChar,
-            Character quoteChar) {
+            Character quoteChar,
+            boolean isDeleteEscapeChar) {
         String text = new String(bytes, Charset.forName(charset));
 
         List<Map<String, String>> list =
@@ -101,7 +103,8 @@ public class InLongMsgKvUtils {
                         kvDelimiter,
                         escapeChar,
                         quoteChar,
-                        lineDelimiter);
+                        lineDelimiter,
+                        isDeleteEscapeChar);
 
         return list.stream().map((line) -> new InLongMsgBody(
                 bytes,
@@ -124,7 +127,8 @@ public class InLongMsgKvUtils {
             String nullLiteral,
             List<String> predefinedFields,
             Map<String, String> entries,
-            FieldToRowDataConverter[] converters) {
+            FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -146,7 +150,7 @@ public class InLongMsgKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral));
+                            nullLiteral, failureHandler));
             row.setField(i, field);
         }
 
@@ -161,7 +165,8 @@ public class InLongMsgKvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral));
+                    nullLiteral,
+                    failureHandler));
             row.setField(i, field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index d99882c30f..c98c0c1f6c 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -84,6 +84,7 @@ public class InLongMsgKvFormatDeserializerTest {
                         null,
                         null,
                         null,
+                        true,
                         errorHandler);
 
         InLongMsg inLongMsg = InLongMsg.newInLongMsg();
@@ -96,7 +97,7 @@ public class InLongMsgKvFormatDeserializerTest {
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg.buildArray(), collector);
-        assertEquals(0, errorHandler.getRowCount());
+        assertEquals(1, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
         String abNormalAttrs = 
"m=0&iname=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -341,5 +342,11 @@ public class InLongMsgKvFormatDeserializerTest {
                 InLongMsgBody body, Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index 814466f079..415c1efd64 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -41,7 +41,10 @@ import java.util.List;
 import java.util.Objects;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_HEAD_DELIMITER;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 
 /**
@@ -92,6 +95,9 @@ public final class InLongMsgTlogCsvFormatDeserializer extends 
AbstractInLongMsgF
     @Nullable
     private final Character quoteChar;
 
+    @Nullable
+    private final Character lineDelimiter;
+
     @Nonnull
     private Boolean isIncludeFirstSegment = false;
     /**
@@ -104,6 +110,14 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
 
     private final FieldToRowDataConverter[] converters;
 
+    /**
+     * True if delete escape char while parsing.
+     */
+    private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
+    private boolean isDeleteHeadDelimiter = DEFAULT_IS_DELETE_HEAD_DELIMITER;
+
+    @Deprecated
     public InLongMsgTlogCsvFormatDeserializer(
             @Nonnull RowFormatInfo rowFormatInfo,
             @Nullable String timeFieldName,
@@ -123,9 +137,42 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
                 delimiter,
                 escapeChar,
                 quoteChar,
+                DEFAULT_LINE_DELIMITER,
                 nullLiteral,
                 metadataKeys,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
                 false,
+                DEFAULT_IS_DELETE_HEAD_DELIMITER,
+                InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+    }
+
+    @Deprecated
+    public InLongMsgTlogCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter,
+            @Nullable String nullLiteral,
+            List<String> metadataKeys,
+            @Nonnull Boolean ignoreErrors) {
+        this(
+                rowFormatInfo,
+                timeFieldName,
+                attributesFieldName,
+                charset,
+                delimiter,
+                escapeChar,
+                quoteChar,
+                lineDelimiter,
+                nullLiteral,
+                metadataKeys,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
+                false,
+                DEFAULT_IS_DELETE_HEAD_DELIMITER,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
     }
 
@@ -141,6 +188,29 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
             List<String> metadataKeys,
             @Nonnull Boolean isIncludeFirstSegment,
             @Nonnull FailureHandler failureHandler) {
+        this(rowFormatInfo, timeFieldName, attributesFieldName,
+                charset, delimiter, escapeChar, quoteChar,
+                DEFAULT_LINE_DELIMITER,
+                nullLiteral, metadataKeys, DEFAULT_IS_DELETE_ESCAPE_CHAR, 
isIncludeFirstSegment,
+                DEFAULT_IS_DELETE_HEAD_DELIMITER, failureHandler);
+
+    }
+
+    public InLongMsgTlogCsvFormatDeserializer(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter,
+            @Nullable String nullLiteral,
+            List<String> metadataKeys,
+            @Nonnull Boolean isDeleteEscapeChar,
+            @Nonnull Boolean isIncludeFirstSegment,
+            @Nonnull Boolean isDeleteHeadDelimiter,
+            @Nonnull FailureHandler failureHandler) {
         super(failureHandler);
 
         this.rowFormatInfo = rowFormatInfo;
@@ -150,9 +220,12 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         this.delimiter = delimiter;
         this.escapeChar = escapeChar;
         this.quoteChar = quoteChar;
+        this.lineDelimiter = lineDelimiter;
         this.nullLiteral = nullLiteral;
         this.metadataKeys = metadataKeys;
         this.isIncludeFirstSegment = isIncludeFirstSegment;
+        this.isDeleteHeadDelimiter = isDeleteHeadDelimiter;
+        this.isDeleteEscapeChar = isDeleteEscapeChar;
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -175,9 +248,8 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
 
     @Override
     protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception 
{
-        return Collections.singletonList(
-                InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, 
escapeChar,
-                        quoteChar, isIncludeFirstSegment));
+        return InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, 
escapeChar,
+                quoteChar, lineDelimiter, isDeleteEscapeChar, 
isIncludeFirstSegment, isDeleteHeadDelimiter);
     }
 
     @Override
@@ -188,9 +260,9 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
                         nullLiteral,
                         head.getPredefinedFields(),
                         body.getFields(),
-                        converters);
+                        converters, failureHandler);
 
-        GenericRowData genericRowData = (GenericRowData) 
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+        GenericRowData genericRowData = 
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
                 timeFieldName,
                 attributesFieldName,
                 head.getTime(),
@@ -208,8 +280,11 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
         private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
         private Character delimiter = DEFAULT_DELIMITER;
+        private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
         private List<String> metadataKeys = Collections.emptyList();
         private boolean isIncludeFirstSegment = false;
+        private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+        private boolean isDeleteHeadDelimiter = 
DEFAULT_IS_DELETE_HEAD_DELIMITER;
 
         public Builder(RowFormatInfo rowFormatInfo) {
             super(rowFormatInfo);
@@ -230,6 +305,11 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
             return this;
         }
 
+        public Builder setLineDelimiter(Character lineDelimiter) {
+            this.lineDelimiter = lineDelimiter;
+            return this;
+        }
+
         public Builder setMetadataKeys(List<String> metadataKeys) {
             this.metadataKeys = metadataKeys;
             return this;
@@ -240,6 +320,16 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
             return this;
         }
 
+        public Builder setDeleteEscapeChar(Boolean isDeleteEscapeChar) {
+            this.isDeleteEscapeChar = isDeleteEscapeChar;
+            return this;
+        }
+
+        public Builder setDeleteHeadDelimiter(Boolean isDeleteHeadDelimiter) {
+            this.isDeleteHeadDelimiter = isDeleteHeadDelimiter;
+            return this;
+        }
+
         public InLongMsgTlogCsvFormatDeserializer build() {
             return new InLongMsgTlogCsvFormatDeserializer(
                     rowFormatInfo,
@@ -249,9 +339,12 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
                     delimiter,
                     escapeChar,
                     quoteChar,
+                    lineDelimiter,
                     nullLiteral,
                     metadataKeys,
+                    isDeleteEscapeChar,
                     isIncludeFirstSegment,
+                    isDeleteHeadDelimiter,
                     InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
         }
     }
@@ -278,15 +371,17 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
                 delimiter.equals(that.delimiter) &&
                 Objects.equals(escapeChar, that.escapeChar) &&
                 Objects.equals(quoteChar, that.quoteChar) &&
+                Objects.equals(lineDelimiter, that.lineDelimiter) &&
                 Objects.equals(nullLiteral, that.nullLiteral) &&
                 Objects.equals(metadataKeys, that.metadataKeys) &&
-                Objects.equals(isIncludeFirstSegment, 
that.isIncludeFirstSegment);
+                Objects.equals(isIncludeFirstSegment, 
that.isIncludeFirstSegment) &&
+                Objects.equals(isDeleteHeadDelimiter, 
that.isDeleteHeadDelimiter);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
-                attributesFieldName, charset, delimiter, escapeChar, quoteChar,
-                nullLiteral, metadataKeys, isIncludeFirstSegment);
+                attributesFieldName, charset, delimiter, escapeChar, 
quoteChar, lineDelimiter,
+                nullLiteral, metadataKeys, isIncludeFirstSegment, 
isDeleteHeadDelimiter);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index a216cf3429..1e72a668b6 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
@@ -29,11 +30,10 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.charset.Charset;
 import java.sql.Timestamp;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
@@ -75,13 +75,16 @@ public class InLongMsgTlogCsvUtils {
         return new InLongMsgHead(attributes, null, time, predefinedFields);
     }
 
-    public static InLongMsgBody parseBody(
+    public static List<InLongMsgBody> parseBody(
             byte[] bytes,
             String charset,
             char delimiter,
             Character escapeChar,
             Character quoteChar,
-            boolean isIncludeFirstSegment) {
+            Character lineDelimiter,
+            boolean isDeleteEscapeChar,
+            boolean isIncludeFirstSegment,
+            boolean isDeleteHeadDelimiter) {
         String text;
         if (bytes[0] == delimiter) {
             text = new String(bytes, 1, bytes.length - 1, 
Charset.forName(charset));
@@ -89,13 +92,25 @@ public class InLongMsgTlogCsvUtils {
             text = new String(bytes, Charset.forName(charset));
         }
 
-        String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
-
-        String tid = segments[0];
-        List<String> fields =
-                Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), 
segments.length).collect(Collectors.toList());
-
-        return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+        String[][] segments = splitCsv(text, delimiter, escapeChar, quoteChar, 
lineDelimiter,
+                isDeleteHeadDelimiter, isDeleteEscapeChar);
+        String tid = "";
+        List<InLongMsgBody> inLongMsgBodies = new ArrayList<>();
+        if (segments.length > 0) {
+            for (int i = 0; i < segments.length; i++) {
+                tid = segments[i][0];
+                List<String> fields = new ArrayList<>();
+                int startIndex = 1;
+                if (isIncludeFirstSegment) {
+                    startIndex = 0;
+                }
+                for (int j = startIndex; j < segments[i].length; j++) {
+                    fields.add(segments[i][j]);
+                }
+                inLongMsgBodies.add(new InLongMsgBody(null, tid, fields, 
Collections.emptyMap()));
+            }
+        }
+        return inLongMsgBodies;
     }
 
     /**
@@ -112,7 +127,8 @@ public class InLongMsgTlogCsvUtils {
             String nullLiteral,
             List<String> predefinedFields,
             List<String> fields,
-            FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+            FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -140,7 +156,7 @@ public class InLongMsgTlogCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral));
+                            nullLiteral, failureHandler));
             rowData.setField(i, field);
         }
 
@@ -160,7 +176,7 @@ public class InLongMsgTlogCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral));
+                            nullLiteral, failureHandler));
             rowData.setField(i + predefinedFields.size(), field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 7e83816b4d..5d52981f6d 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -89,14 +89,16 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
                         DEFAULT_DELIMITER,
                         null,
                         null,
+                        '\n',
                         null,
                         Collections.emptyList(),
                         false,
+                        false,
+                        false,
                         errorHandler);
-
         InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
         String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=test";
-        String body1 = "interfaceId1,field1,field2,field3";
+        String body1 = 
"interfaceId1,field1,field2,field3\ninterfaceId1,field1,field2,field3";
         String body2 = "interfaceId2,field1,field2,field3";
         inLongMsg1.addMsg(attrs, body1.getBytes());
         inLongMsg1.addMsg(attrs, body2.getBytes());
@@ -104,7 +106,8 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg1.buildArray(), collector);
-        assertEquals(0, errorHandler.getRowCount());
+        assertEquals(3, errorHandler.getRowCount());
+        assertEquals(3, actualRows.size());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
@@ -336,5 +339,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
                 throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index dee2a5b791..2d54f9db13 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -45,12 +45,15 @@ import java.util.Objects;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
 
@@ -115,12 +118,17 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
     @Nullable
     private final Character quoteChar;
 
+    @Nullable
+    private final Character lineDelimiter;
+
     /**
      * The literal represented null values, default "".
      */
     @Nullable
     private final String nullLiteral;
 
+    private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
     private final FieldToRowDataConverter[] converters;
 
     public InLongMsgTlogKvFormatDeserializer(
@@ -133,6 +141,7 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
             @Nonnull Character kvDelimiter,
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter,
             @Nullable String nullLiteral,
             @Nonnull Boolean ignoreErrors) {
         this(
@@ -145,7 +154,9 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
                 kvDelimiter,
                 escapeChar,
                 quoteChar,
+                lineDelimiter,
                 nullLiteral,
+                DEFAULT_IS_DELETE_ESCAPE_CHAR,
                 InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
     }
 
@@ -159,7 +170,9 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
             @Nonnull Character kvDelimiter,
             @Nullable Character escapeChar,
             @Nullable Character quoteChar,
+            @Nullable Character lineDelimiter,
             @Nullable String nullLiteral,
+            @Nullable Boolean isDeleteEscapeChar,
             @Nonnull FailureHandler failureHandler) {
         super(failureHandler);
 
@@ -172,7 +185,9 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
         this.kvDelimiter = kvDelimiter;
         this.escapeChar = escapeChar;
         this.quoteChar = quoteChar;
+        this.lineDelimiter = lineDelimiter;
         this.nullLiteral = nullLiteral;
+        this.isDeleteEscapeChar = isDeleteEscapeChar;
 
         this.converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -193,15 +208,14 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
 
     @Override
     protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception 
{
-        return Collections.singletonList(
-                InLongMsgTlogKvUtils.parseBody(
-                        bytes,
-                        charset,
-                        delimiter,
-                        entryDelimiter,
-                        kvDelimiter,
-                        escapeChar,
-                        quoteChar));
+        return InLongMsgTlogKvUtils.parseBody(
+                bytes,
+                charset,
+                delimiter,
+                entryDelimiter,
+                kvDelimiter,
+                escapeChar,
+                quoteChar, lineDelimiter, isDeleteEscapeChar);
     }
 
     @Override
@@ -211,7 +225,7 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
                         rowFormatInfo,
                         nullLiteral,
                         head.getPredefinedFields(),
-                        body.getEntries(), converters);
+                        body.getEntries(), converters, failureHandler);
 
         RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
                 timeFieldName,
@@ -233,6 +247,8 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
         private Character delimiter = DEFAULT_DELIMITER;
         private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
         private Character kvDelimiter = DEFAULT_KV_DELIMITER;
+        private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+        private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
 
         public Builder(RowFormatInfo rowFormatInfo) {
             super(rowFormatInfo);
@@ -265,6 +281,16 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
             return this;
         }
 
+        public Builder setLineDelimiter(Character lineDelimiter) {
+            this.lineDelimiter = lineDelimiter;
+            return this;
+        }
+
+        public Builder setDeleteEscapeChar(boolean isDeleteEscapeChar) {
+            this.isDeleteEscapeChar = isDeleteEscapeChar;
+            return this;
+        }
+
         public Builder configure(DescriptorProperties descriptorProperties) {
             super.configure(descriptorProperties);
 
@@ -278,11 +304,29 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
                     .ifPresent(this::setEntryDelimiter);
             descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
                     .ifPresent(this::setKvDelimiter);
+            descriptorProperties.getOptionalCharacter(FORMAT_LINE_DELIMITER)
+                    .ifPresent(this::setLineDelimiter);
 
             return this;
         }
 
         public InLongMsgTlogKvFormatDeserializer build() {
+            if (failureHandler != null) {
+                return new InLongMsgTlogKvFormatDeserializer(
+                        rowFormatInfo,
+                        timeFieldName,
+                        attributesFieldName,
+                        charset,
+                        delimiter,
+                        entryDelimiter,
+                        kvDelimiter,
+                        escapeChar,
+                        quoteChar,
+                        lineDelimiter,
+                        nullLiteral,
+                        isDeleteEscapeChar,
+                        failureHandler);
+            }
             return new InLongMsgTlogKvFormatDeserializer(
                     rowFormatInfo,
                     timeFieldName,
@@ -293,6 +337,7 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
                     kvDelimiter,
                     escapeChar,
                     quoteChar,
+                    lineDelimiter,
                     nullLiteral,
                     ignoreErrors);
         }
@@ -322,13 +367,15 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
                 kvDelimiter.equals(that.kvDelimiter) &&
                 Objects.equals(escapeChar, that.escapeChar) &&
                 Objects.equals(quoteChar, that.quoteChar) &&
-                Objects.equals(nullLiteral, that.nullLiteral);
+                Objects.equals(lineDelimiter, that.lineDelimiter) &&
+                Objects.equals(nullLiteral, that.nullLiteral) &&
+                Objects.equals(isDeleteEscapeChar, that.isDeleteEscapeChar);
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
                 attributesFieldName, charset, delimiter, entryDelimiter, 
kvDelimiter,
-                escapeChar, quoteChar, nullLiteral);
+                escapeChar, quoteChar, lineDelimiter, nullLiteral, 
isDeleteEscapeChar);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 7e5dc6f864..f42caf3053 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
@@ -27,6 +28,7 @@ import org.apache.flink.table.data.GenericRowData;
 
 import java.nio.charset.Charset;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -69,14 +71,14 @@ public class InLongMsgTlogKvUtils {
         return new InLongMsgHead(attributes, null, time, predefinedFields);
     }
 
-    public static InLongMsgBody parseBody(
+    public static List<InLongMsgBody> parseBody(
             byte[] bytes,
             String charset,
             char delimiter,
             char entryDelimiter,
             char kvDelimiter,
             Character escapeChar,
-            Character quoteChar) {
+            Character quoteChar, Character lineDelimiter, boolean 
isDeleteEscapeChar) {
         String text;
         if (bytes[0] == delimiter) {
             text = new String(bytes, 1, bytes.length - 1, 
Charset.forName(charset));
@@ -87,15 +89,18 @@ public class InLongMsgTlogKvUtils {
         String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
 
         String streamId = segments[0];
-
-        Map<String, String> entries;
+        List<InLongMsgBody> inLongMsgBodies = new ArrayList<>();
+        List<Map<String, String>> entries;
         if (segments.length > 1) {
-            entries = splitKv(segments[1], entryDelimiter, kvDelimiter, 
escapeChar, quoteChar);
+            entries = splitKv(segments[1], entryDelimiter, kvDelimiter, 
escapeChar, quoteChar,
+                    lineDelimiter, isDeleteEscapeChar);
+            for (Map<String, String> maps : entries) {
+                inLongMsgBodies.add(new InLongMsgBody(null, streamId, 
Collections.emptyList(), maps));
+            }
         } else {
-            entries = Collections.emptyMap();
+            inLongMsgBodies.add(new InLongMsgBody(null, streamId, 
Collections.emptyList(), Collections.emptyMap()));
         }
-
-        return new InLongMsgBody(bytes, streamId, Collections.emptyList(), 
entries);
+        return inLongMsgBodies;
     }
 
     /**
@@ -112,7 +117,8 @@ public class InLongMsgTlogKvUtils {
             String nullLiteral,
             List<String> predefinedFields,
             Map<String, String> entries,
-            FieldToRowDataConverter[] converters) {
+            FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
@@ -133,7 +139,7 @@ public class InLongMsgTlogKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral));
+                            nullLiteral, failureHandler));
             rowData.setField(i, field);
         }
 
@@ -146,7 +152,7 @@ public class InLongMsgTlogKvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral));
+                    nullLiteral, failureHandler));
             rowData.setField(i, field);
         }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index f8b4c9c519..490fded392 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -47,7 +47,9 @@ import static org.apache.flink.table.api.DataTypes.STRING;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
 import static org.junit.Assert.assertEquals;
@@ -85,7 +87,9 @@ public class InLongMsgTlogKvFormatDeserializerTest {
                         DEFAULT_KV_DELIMITER,
                         null,
                         null,
+                        DEFAULT_LINE_DELIMITER,
                         null,
+                        DEFAULT_IS_DELETE_ESCAPE_CHAR,
                         errorHandler);
 
         InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
@@ -98,7 +102,7 @@ public class InLongMsgTlogKvFormatDeserializerTest {
         List<RowData> actualRowDatas = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRowDatas);
         deserializer.flatMap(inLongMsg1.buildArray(), collector);
-        assertEquals(0, errorHandler.getRowCount());
+        assertEquals(2, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
@@ -113,10 +117,10 @@ public class InLongMsgTlogKvFormatDeserializerTest {
         InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
 
         String attrs = "m=0&t=20200322&__addcol1_=1&__addcol2_=2";
-        String body1 = "testInterfaceId1,f1=field1&f2=field2&f3=field3";
-        String body2 = "f1=field1&f2=field2&f3=field3";
-        String body3 = "f1=field1&f2=field2,f1=field1&f2=field2&f3=field3";
-        String body4 = ",testInterfaceId1,f1=field1&f2=field2&f3=field3";
+        String body1 = "testInterfaceId1,f1=field11&f2=field12&f3=field13";
+        String body2 = "f1=field21&f2=field22&f3=field23";
+        String body3 = "f1=field3&f2=field2,f1=field31&f2=field32&f3=field33";
+        String body4 = ",testInterfaceId1,f1=field41&f2=field42&f3=field43";
 
         inLongMsg1.addMsg(attrs, body1.getBytes());
         inLongMsg1.addMsg(attrs, body2.getBytes());
@@ -134,9 +138,9 @@ public class InLongMsgTlogKvFormatDeserializerTest {
         expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
         expectRowData1.setField(2, 1);
         expectRowData1.setField(3, 2);
-        expectRowData1.setField(4, StringData.fromString("field1"));
-        expectRowData1.setField(5, StringData.fromString("field2"));
-        expectRowData1.setField(6, StringData.fromString("field3"));
+        expectRowData1.setField(4, StringData.fromString("field11"));
+        expectRowData1.setField(5, StringData.fromString("field12"));
+        expectRowData1.setField(6, StringData.fromString("field13"));
 
         GenericRowData expectRowData2 = new GenericRowData(7);
         expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
@@ -152,18 +156,18 @@ public class InLongMsgTlogKvFormatDeserializerTest {
         expectRowData3.setField(1, mapConvert.convert(expectedAttributes));
         expectRowData3.setField(2, 1);
         expectRowData3.setField(3, 2);
-        expectRowData3.setField(4, StringData.fromString("field1"));
-        expectRowData3.setField(5, StringData.fromString("field2"));
-        expectRowData3.setField(6, StringData.fromString("field3"));
+        expectRowData3.setField(4, StringData.fromString("field31"));
+        expectRowData3.setField(5, StringData.fromString("field32"));
+        expectRowData3.setField(6, StringData.fromString("field33"));
 
         GenericRowData expectRowData4 = new GenericRowData(7);
         expectRowData4.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
         expectRowData4.setField(1, mapConvert.convert(expectedAttributes));
         expectRowData4.setField(2, 1);
         expectRowData4.setField(3, 2);
-        expectRowData4.setField(4, StringData.fromString("field1"));
-        expectRowData4.setField(5, StringData.fromString("field2"));
-        expectRowData4.setField(6, StringData.fromString("field3"));
+        expectRowData4.setField(4, StringData.fromString("field41"));
+        expectRowData4.setField(5, StringData.fromString("field42"));
+        expectRowData4.setField(6, StringData.fromString("field43"));
 
         testRowDataDeserialization(inLongMsg1.buildArray(),
                 Arrays.asList(expectRowData1, expectRowData2, expectRowData3, 
expectRowData4));
@@ -219,5 +223,11 @@ public class InLongMsgTlogKvFormatDeserializerTest {
                 InLongMsgBody body, Exception exception) throws Exception {
             rowCount++;
         }
+
+        @Override
+        public void onConvertingFieldFailure(String fieldName, String 
fieldText, FormatInfo formatInfo,
+                Exception exception) throws Exception {
+            rowCount++;
+        }
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 22e5b5cb9e..eae08f62ce 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -47,13 +47,13 @@ import static org.junit.Assert.assertNull;
 public class TableFormatUtilsTest {
 
     @Test
-    public void testDeserializeStringWithoutNullLiteral() {
+    public void testDeserializeStringWithoutNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         StringFormatInfo.INSTANCE,
                         "data",
-                        null);
+                        null, null);
         assertEquals("data", result1);
 
         Object result2 =
@@ -61,7 +61,7 @@ public class TableFormatUtilsTest {
                         "f",
                         StringFormatInfo.INSTANCE,
                         "",
-                        null);
+                        null, null);
         assertEquals("", result2);
     }
 
@@ -85,13 +85,13 @@ public class TableFormatUtilsTest {
     }
 
     @Test
-    public void testDeserializeStringWithNullLiteral() {
+    public void testDeserializeStringWithNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         StringFormatInfo.INSTANCE,
                         "data",
-                        "n/a");
+                        "n/a", null);
         assertEquals("data", result1);
 
         Object result2 =
@@ -99,7 +99,7 @@ public class TableFormatUtilsTest {
                         "f",
                         StringFormatInfo.INSTANCE,
                         "",
-                        "n/a");
+                        "n/a", null);
         assertEquals("", result2);
 
         Object result3 =
@@ -107,7 +107,7 @@ public class TableFormatUtilsTest {
                         "f",
                         StringFormatInfo.INSTANCE,
                         "n/a",
-                        "n/a");
+                        "n/a", null);
         assertNull(result3);
     }
 
@@ -139,13 +139,13 @@ public class TableFormatUtilsTest {
     }
 
     @Test
-    public void testDeserializeNumberWithoutNullLiteral() {
+    public void testDeserializeNumberWithoutNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         IntFormatInfo.INSTANCE,
                         "1",
-                        null);
+                        null, null);
         assertEquals(1, result1);
 
         Object result2 =
@@ -153,7 +153,7 @@ public class TableFormatUtilsTest {
                         "f",
                         IntFormatInfo.INSTANCE,
                         "",
-                        null);
+                        null, null);
         assertNull(result2);
     }
 
@@ -177,13 +177,13 @@ public class TableFormatUtilsTest {
     }
 
     @Test
-    public void testDeserializeNumberWithNullLiteral() {
+    public void testDeserializeNumberWithNullLiteral() throws Exception {
         Object result1 =
                 deserializeBasicField(
                         "f",
                         IntFormatInfo.INSTANCE,
                         "1",
-                        "n/a");
+                        "n/a", null);
         assertEquals(1, result1);
 
         try {
@@ -191,7 +191,7 @@ public class TableFormatUtilsTest {
                     "f",
                     IntFormatInfo.INSTANCE,
                     "",
-                    "n/a");
+                    "n/a", null);
 
             assertNull(result2);
         } catch (Exception e) {
@@ -203,7 +203,7 @@ public class TableFormatUtilsTest {
                         "f",
                         IntFormatInfo.INSTANCE,
                         "n/a",
-                        "n/a");
+                        "n/a", null);
         assertNull(result3);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index 2a7182e39b..665b1b2fc5 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -132,6 +133,30 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
                 .toArray(FieldToRowDataConverter[]::new);
     }
 
+    public CsvRowDataDeserializationSchema(
+            @Nonnull TypeInformation<RowData> resultTypeInfo,
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull String charset,
+            @Nonnull Character delimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            FailureHandler failureHandler) {
+        super(failureHandler);
+        this.resultTypeInfo = resultTypeInfo;
+        this.rowFormatInfo = rowFormatInfo;
+        this.charset = charset;
+        this.delimiter = delimiter;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+        this.nullLiteral = nullLiteral;
+
+        converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+                .map(formatInfo -> FieldToRowDataConverters.createConverter(
+                        TableFormatUtils.deriveLogicalType(formatInfo)))
+                .toArray(FieldToRowDataConverter[]::new);
+    }
+
     /**
      * A builder for creating a {@link CsvRowDataDeserializationSchema}.
      */
@@ -236,7 +261,7 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
                                     fieldNames[i],
                                     fieldFormatInfos[i],
                                     fieldTexts[i],
-                                    nullLiteral);
+                                    nullLiteral, failureHandler);
 
                     rowData.setField(i, converters[i].convert(field));
                 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index 0f4e2b4a63..771cc5bd17 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -144,13 +146,40 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
                 
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
     }
 
+    public KvRowDataDeserializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull TypeInformation<RowData> producedTypeInfo,
+            @Nonnull String charset,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            @Nullable FailureHandler failureHandler) {
+        super(failureHandler);
+        this.rowFormatInfo = rowFormatInfo;
+        this.producedTypeInfo = producedTypeInfo;
+        this.charset = charset;
+        this.entryDelimiter = entryDelimiter;
+        this.kvDelimiter = kvDelimiter;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+        this.nullLiteral = nullLiteral;
+
+        converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+                .map(formatInfo -> FieldToRowDataConverters.createConverter(
+                        
TableFormatForRowDataUtils.deriveLogicalType(formatInfo)))
+                
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
+    }
+
     @Override
     public RowData deserializeInternal(byte[] bytes) throws Exception {
         String text = new String(bytes, Charset.forName(charset));
         GenericRowData rowData = null;
         try {
-            Map<String, String> fieldTexts =
-                    splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar);
+            List<Map<String, String>> fieldTexts =
+                    splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar, null,
+                            true);
 
             String[] fieldNames = rowFormatInfo.getFieldNames();
             FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
@@ -160,13 +189,13 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
                 String fieldName = fieldNames[i];
                 FormatInfo fieldFormatInfo = fieldFormatInfos[i];
 
-                String fieldText = fieldTexts.get(fieldName);
+                String fieldText = fieldTexts.get(0).get(fieldName);
 
                 Object field = deserializeBasicField(
                         fieldName,
                         fieldFormatInfo,
                         fieldText,
-                        nullLiteral);
+                        nullLiteral, failureHandler);
                 rowData.setField(i, converters[i].convert(field));
             }
             return rowData;
@@ -199,6 +228,17 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
         }
 
         public KvRowDataDeserializationSchema build() {
+            if (failureHandler != null) {
+                return new KvRowDataDeserializationSchema(
+                        rowFormatInfo,
+                        producedTypeInfo,
+                        charset,
+                        entryDelimiter,
+                        kvDelimiter,
+                        escapeChar,
+                        quoteChar,
+                        nullLiteral, failureHandler);
+            }
             return new KvRowDataDeserializationSchema(
                     rowFormatInfo,
                     producedTypeInfo,
@@ -229,7 +269,8 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
                 kvDelimiter.equals(that.kvDelimiter) &&
                 Objects.equals(escapeChar, that.escapeChar) &&
                 Objects.equals(quoteChar, that.quoteChar) &&
-                Objects.equals(nullLiteral, that.nullLiteral);
+                Objects.equals(nullLiteral, that.nullLiteral) &&
+                Objects.equals(failureHandler, that.failureHandler);
     }
 
     @Override
@@ -242,6 +283,6 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
                 kvDelimiter,
                 escapeChar,
                 quoteChar,
-                nullLiteral);
+                nullLiteral, failureHandler);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
index 9452395f59..2376ff198f 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
@@ -37,7 +37,8 @@ public class KvUtilsTest {
     @Test
     public void testSplitNormal() {
         List<Map<String, String>> list =
-                splitKv("f1=a\nf1=b", '&', '=', '\\', '\"', '\n');
+                splitKv("f1=a\nf1=b", '&', '=', '\\', '\"',
+                        '\n', true);
         List<Map<String, String>> expectedList = new ArrayList<>();
         expectedList.add(new HashMap<String, String>() {
 
@@ -195,6 +196,103 @@ public class KvUtilsTest {
                 splitKv("=a&f=", '&', '=', '\\', '\"'));
     }
 
+    @Test
+    public void testSplitNormalWithoutEscape() {
+        List<Map<String, String>> list =
+                splitKv("f1=a\nf1=b", '&', '=', '\\', '\"',
+                        '\n', false);
+        List<Map<String, String>> expectedList = new ArrayList<>();
+        expectedList.add(new HashMap<String, String>() {
+
+            {
+                put("f1", "a");
+            }
+        });
+        expectedList.add(new HashMap<String, String>() {
+
+            {
+                put("f1", "b");
+            }
+        });
+        assertEquals(list, expectedList);
+        ArrayList expectedListMap = new ArrayList<HashMap<String, String>>();
+        HashMap expectedMap = new HashMap<String, String>();
+        expectedMap.put("\\=f1", "a");
+        expectedMap.put("f2", "b");
+        expectedMap.put("f3", "c");
+        expectedListMap.add(expectedMap);
+        assertEquals(expectedListMap,
+                splitKv("\\=f1=a&f2=b&f3=c", '&', '=', '\\',
+                        null, null, false));
+
+        expectedMap.clear();
+        expectedMap.put("\\&f1", "a");
+        expectedMap.put("f2", "b");
+        expectedMap.put("f3", "c");
+        assertEquals(expectedListMap,
+                splitKv("\\&f1=a&f2=b&f3=c", '&', '=', '\\',
+                        null, null, false));
+
+        expectedMap.clear();
+        expectedMap.put("&f1", "a");
+        expectedMap.put("f2", "b");
+        expectedMap.put("f3", "c");
+        assertEquals(expectedListMap,
+                splitKv("\"&f1\"=a&f2=b&f3=c", '&', '=', '\\',
+                        '\"', null, false));
+
+        expectedMap.clear();
+        expectedMap.put("f1", "a\\&");
+        expectedMap.put("f2", "b");
+        expectedMap.put("f3", "c");
+        assertEquals(expectedListMap,
+                splitKv("f1=a\\&&f2=b&f3=c", '&', '=', '\\',
+                        null, null, false));
+
+        expectedMap.clear();
+        expectedMap.put("f1", "a\\\\");
+        expectedMap.put("f2", "b");
+        expectedMap.put("f3", "c");
+        assertEquals(expectedListMap,
+                splitKv("f1=a\\\\&f2=b&f3=c", '&', '=',
+                        '\\', null, null, false));
+
+        expectedMap.clear();
+        expectedMap.put("f1", "a&f2=b");
+        expectedMap.put("f3", "c");
+        expectedMap.put("f4", "d");
+        assertEquals(expectedListMap,
+                splitKv("f1=a\"&f2=\"b&f3=c&f4=d", '&', '=',
+                        '\\', '\"', null, false));
+
+        expectedMap.clear();
+        expectedMap.put("f1", "atest\\test");
+        expectedMap.put("f2", "b");
+        expectedMap.put("f3", "c");
+        assertEquals(expectedListMap,
+                splitKv("f1=a\"test\\test\"&f2=b&f3=c", '&', '=', '\\',
+                        '\"', null, false));
+
+        expectedMap.clear();
+        expectedMap.put("f1", "a");
+        expectedMap.put("f2", "\\\"b");
+        expectedMap.put("f3", "c\\\"");
+        expectedMap.put("f4", "d");
+        assertEquals(expectedListMap,
+                splitKv("f1=a&f2=\\\"b&f3=c\\\"&f4=d", '&', '=', '\\',
+                        '\"', null, false));
+
+        expectedMap.clear();
+        expectedMap.put("", "a");
+        expectedMap.put("f", "");
+        HashMap expectedMap2 = new HashMap<String, String>();
+        expectedMap2.put("c", "dd");
+        expectedListMap.add(expectedMap2);
+        assertEquals(expectedListMap,
+                splitKv("=a&f=\nc=d\\d", '&', '=', '\\',
+                        '\"', '\n', true));
+    }
+
     @Test
     public void testSplitNestedValue() {
         Map<String, String> kvMap = splitKv("f1=a=a&f2=b&f3=c", '&', '=',


Reply via email to