This is an automated email from the ASF dual-hosted git repository.

baomingyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e101491385 [INLONG-12065][Sort] Sort Format supports outputting 
complete row information when errors occur in field parsing (#12066)
e101491385 is described below

commit e101491385be810ea49d1f9842b6ee56cd74ac1c
Author: Mingyu Bao <[email protected]>
AuthorDate: Fri Jan 16 14:12:15 2026 +0800

    [INLONG-12065][Sort] Sort Format supports outputting complete row 
information when errors occur in field parsing (#12066)
---
 .../inlong/sort/formats/base/TableFormatUtils.java | 72 ++++++++++++++--------
 .../sort/formats/inlongmsg/FailureHandler.java     | 30 +++++++++
 .../sort/formats/inlongmsg/InLongMsgBody.java      | 42 ++++++-------
 .../sort/formats/inlongmsg/InLongMsgWrap.java      | 17 +++--
 .../InLongMsgBinlogFormatDeserializer.java         |  2 +-
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java      |  1 +
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    |  3 +-
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java |  1 +
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    |  2 +-
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java      |  2 +-
 .../AbstractInLongMsgFormatDeserializer.java       | 31 ++++++----
 .../InLongMsgBinlogFormatDeserializer.java         |  4 +-
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java      |  1 +
 .../InLongMsgCsvFormatDeserializer.java            | 17 ++---
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    | 15 +++--
 .../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 10 +--
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 31 +++++++---
 .../InLongMsgPbDeserializationSchema.java          |  3 +-
 .../InLongMsgTlogCsvFormatDeserializer.java        | 16 ++---
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    | 15 +++--
 .../InLongMsgTlogKvFormatDeserializer.java         |  4 +-
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java      | 56 +++++++++--------
 .../formats/base/DefaultDeserializationSchema.java | 14 +++++
 .../csv/CsvRowDataDeserializationSchema.java       | 14 ++---
 .../formats/kv/KvRowDataDeserializationSchema.java |  4 +-
 25 files changed, 243 insertions(+), 164 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index dd1d3e1579..32adac5559 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -59,8 +59,9 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.TypeInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.table.api.DataTypes;
@@ -551,6 +552,22 @@ public class TableFormatUtils {
             String fieldText,
             String nullLiteral,
             FailureHandler failureHandler) throws Exception {
+        return deserializeBasicField(fieldName, fieldFormatInfo, fieldText, 
nullLiteral,
+                null, null, null, failureHandler);
+    }
+
+    /**
+     * Deserializes the basic field.
+     */
+    public static Object deserializeBasicField(
+            String fieldName,
+            FormatInfo fieldFormatInfo,
+            String fieldText,
+            String nullLiteral,
+            InLongMsgHead head,
+            InLongMsgBody inLongMsgBody,
+            String originBody,
+            FailureHandler failureHandler) throws Exception {
         checkState(fieldFormatInfo instanceof BasicFormatInfo);
 
         if (fieldText == null) {
@@ -574,42 +591,49 @@ public class TableFormatUtils {
         try {
             return ((BasicFormatInfo<?>) 
fieldFormatInfo).deserialize(fieldText);
         } catch (Exception e) {
-            LOG.warn("Could not properly deserialize the " + "text "
-                    + fieldText + " for field " + fieldName + ".", e);
             if (failureHandler != null) {
-                failureHandler.onConvertingFieldFailure(fieldName, fieldText, 
fieldFormatInfo, e);
+                failureHandler.onConvertingFieldFailure(fieldName, fieldText, 
fieldFormatInfo,
+                        head, inLongMsgBody, originBody, e);
+            } else {
+                LOG.warn("Could not properly deserialize the" + "text: {},for 
field:{}"
+                        + ". predefinedFields = {},fields = {}, attr={}, 
originBody={}",
+                        fieldText, fieldName,
+                        head == null ? "" : head.getPredefinedFields(),
+                        inLongMsgBody == null ? "" : inLongMsgBody.getFields(),
+                        head == null ? "" : head.getAttributes(),
+                        originBody == null ? (inLongMsgBody == null ? "" : new 
String(inLongMsgBody.getDataBytes()))
+                                : originBody,
+                        e);
             }
         }
         return null;
     }
 
     public static long getFormatValueLength(FormatInfo fieldFormatInfo, String 
fieldText) {
-        if (fieldFormatInfo instanceof BooleanFormatInfo) {
-            return 4;
-        } else if (fieldFormatInfo instanceof ByteFormatInfo) {
-            return 4;
-        } else if (fieldFormatInfo instanceof BooleanFormatInfo) {
-            return 4;
-        } else if (fieldFormatInfo instanceof ShortFormatInfo) {
-            return 4;
-        } else if (fieldFormatInfo instanceof IntFormatInfo) {
-            return 4;
+        if (fieldFormatInfo instanceof StringFormatInfo) {
+            return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
         } else if (fieldFormatInfo instanceof LongFormatInfo) {
-            return 8;
-        } else if (fieldFormatInfo instanceof FloatFormatInfo) {
-            return 8;
+            return 24;
+        } else if (fieldFormatInfo instanceof IntFormatInfo) {
+            return 16;
         } else if (fieldFormatInfo instanceof DoubleFormatInfo) {
-            return 8;
-        } else if (fieldFormatInfo instanceof DecimalFormatInfo) {
-            return 8;
+            return 24;
+        } else if (fieldFormatInfo instanceof FloatFormatInfo) {
+            return 16;
         } else if (fieldFormatInfo instanceof DateFormatInfo
                 || fieldFormatInfo instanceof TimeFormatInfo
                 || fieldFormatInfo instanceof TimestampFormatInfo) {
-            return 8;
-        } else if (StringUtils.isNotEmpty(fieldText)) {
-            return fieldText.length();
+            return 24;
+        } else if (fieldFormatInfo instanceof BooleanFormatInfo) {
+            return 16;
+        } else if (fieldFormatInfo instanceof ByteFormatInfo) {
+            return 16;
+        } else if (fieldFormatInfo instanceof ShortFormatInfo) {
+            return 16;
+        } else if (fieldFormatInfo instanceof DecimalFormatInfo) {
+            return 24;
         }
-        return 0L;
+        return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
     }
 
     /**
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
index e07b9fc815..b7afcb9797 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
@@ -85,4 +85,34 @@ public interface FailureHandler extends Serializable {
     void onConvertingFieldFailure(String fieldName, String fieldText, 
FormatInfo formatInfo,
             Exception exception) throws Exception;
 
+    /**
+     * This method is called when there is a failure occurred while converting 
any field to row.
+     *
+     * @param fieldName the filed name
+     * @param fieldText the filed test
+     * @param formatInfo the filed target type info
+     * @param exception the thrown exception
+     * @param head the predefined fields
+     * @param inLongMsgBody the fields
+     * @param originBody the origin body
+     * @throws Exception the exception
+     */
+    default void onConvertingFieldFailure(String fieldName, String fieldText, 
FormatInfo formatInfo,
+            InLongMsgHead head, InLongMsgBody inLongMsgBody, String originBody,
+            Exception exception) throws Exception {
+        onConvertingFieldFailure(fieldName, fieldText, formatInfo, exception);
+    }
+
+    /**
+     * This method is called when there is a failure occurred while field num 
error.
+     *
+     * @param predefinedFields predefined fields
+     * @param originBodyBytes origin body bytes
+     * @param originBody origin body
+     * @param actualNumFields actual number of fields
+     * @param fieldNameSize expected number of fields
+     */
+    default void onFieldNumError(String predefinedFields, byte[] 
originBodyBytes, String originBody,
+            int actualNumFields, int fieldNameSize) {
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
index 77a7812b22..f25a62ea9c 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
@@ -25,6 +28,7 @@ import java.util.Map;
 /**
  * The body deserialized from {@link InLongMsgBody}.
  */
+@Data
 public class InLongMsgBody implements Serializable {
 
     private static final long serialVersionUID = 1L;
@@ -32,7 +36,12 @@ public class InLongMsgBody implements Serializable {
     /**
      * The body of the record.
      */
-    private final byte[] data;
+    private final byte[] dataBytes;
+
+    /**
+     * The body of the record.
+     */
+    private final String data;
 
     /**
      * The interface of the record.
@@ -50,32 +59,18 @@ public class InLongMsgBody implements Serializable {
     private final Map<String, String> entries;
 
     public InLongMsgBody(
-            byte[] data,
+            byte[] dataBytes,
+            String data,
             String streamId,
             List<String> fields,
             Map<String, String> entries) {
+        this.dataBytes = dataBytes;
         this.data = data;
         this.streamId = streamId;
         this.fields = fields;
         this.entries = entries;
     }
 
-    public byte[] getData() {
-        return data;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public List<String> getFields() {
-        return fields;
-    }
-
-    public Map<String, String> getEntries() {
-        return entries;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -87,17 +82,22 @@ public class InLongMsgBody implements Serializable {
         }
 
         InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
-        return Arrays.equals(data, inLongMsgBody.data);
+        return StringUtils.equals(data, inLongMsgBody.data)
+                && Arrays.equals(dataBytes, inLongMsgBody.dataBytes);
     }
 
     @Override
     public int hashCode() {
-        return Arrays.hashCode(data);
+        if (dataBytes != null) {
+            return Arrays.hashCode(dataBytes);
+        }
+        return data == null ? super.hashCode() : data.hashCode();
     }
 
     @Override
     public String toString() {
-        return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", 
streamId='" + streamId + '\''
+        return "InLongMsgBody{" + "data=" + (data == null ? new 
String(dataBytes) : data)
+                + ", streamId='" + streamId + '\''
                 + ", fields=" + fields + ", entries=" + entries + '}';
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
index fba2731142..ddb88d01e5 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
@@ -17,28 +17,27 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.List;
 
 /**
  * The body deserialized from {@link InLongMsgWrap}.
  */
+@Data
 public class InLongMsgWrap implements Serializable {
 
     private final InLongMsgHead inLongMsgHead;
 
     private final List<InLongMsgBody> inLongMsgBodyList;
 
-    public InLongMsgWrap(InLongMsgHead inLongMsgHead, List<InLongMsgBody> 
inLongMsgBodyList) {
+    private final byte[] originBody;
+
+    public InLongMsgWrap(InLongMsgHead inLongMsgHead,
+            List<InLongMsgBody> inLongMsgBodyList, byte[] originBody) {
         this.inLongMsgHead = inLongMsgHead;
         this.inLongMsgBodyList = inLongMsgBodyList;
-    }
-
-    public InLongMsgHead getInLongMsgHead() {
-        return inLongMsgHead;
-    }
-
-    public List<InLongMsgBody> getInLongMsgBodyList() {
-        return inLongMsgBodyList;
+        this.originBody = originBody;
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 9cb52d3d07..a0ec713e1c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -158,7 +158,7 @@ public final class InLongMsgBinlogFormatDeserializer 
extends AbstractInLongMsgFo
                 attributesFieldName,
                 metadataFieldName,
                 head.getAttributes(),
-                body.getData(),
+                body.getDataBytes(),
                 includeUpdateBefore);
     }
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 4eb3a042f6..546cac181a 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -138,6 +138,7 @@ public class InLongMsgBinlogUtils {
         return new InLongMsgBody(
                 bytes,
                 null,
+                null,
                 Collections.emptyList(),
                 Collections.emptyMap());
     }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index e4e4e5659e..8a6297dbbd 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -116,7 +116,8 @@ public class InLongMsgCsvUtils {
                     // Only parsed fields will be used by downstream, so it's 
safe to leave
                     // the other parameters empty.
                     return new InLongMsgBody(
-                            null,
+                            bytes,
+                            bodyStr,
                             null,
                             Arrays.asList(line),
                             Collections.emptyMap());
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index d8414ca038..f9362a5db8 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -106,6 +106,7 @@ public class InLongMsgKvUtils {
         return list.stream().map((line) -> {
             return new InLongMsgBody(
                     bytes,
+                    text,
                     null,
                     Collections.emptyList(),
                     line);
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 82bc8e2ebc..b17c54bb63 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -94,7 +94,7 @@ public class InLongMsgTlogCsvUtils {
         List<String> fields =
                 Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), 
segments.length).collect(Collectors.toList());
 
-        return new InLongMsgBody(bytes, streamId, fields, 
Collections.emptyMap());
+        return new InLongMsgBody(bytes, text, streamId, fields, 
Collections.emptyMap());
     }
 
     /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 6982b5bea0..60689f17c3 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -94,7 +94,7 @@ public class InLongMsgTlogKvUtils {
             entries = Collections.emptyMap();
         }
 
-        return new InLongMsgBody(bytes, streamId, Collections.emptyList(), 
entries);
+        return new InLongMsgBody(bytes, text, streamId, 
Collections.emptyList(), entries);
     }
 
     /**
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index 43c937a5a7..1b4283b6d7 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -33,7 +33,6 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -47,8 +46,6 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
 
-    protected long lastPrintTimestamp = 0L;
-    protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
     protected int fieldNameSize = 0;
 
     protected FailureHandler failureHandler;
@@ -84,15 +81,6 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
 
     protected abstract List<FormatMsg> convertFormatMsgList(InLongMsgHead 
head, InLongMsgBody body) throws Exception;
 
-    protected boolean needPrint() {
-        long now = Instant.now().toEpochMilli();
-        if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
-            lastPrintTimestamp = now;
-            return true;
-        }
-        return false;
-    }
-
     public void flatMap(
             byte[] bytes,
             Collector<RowData> collector) throws Exception {
@@ -160,7 +148,7 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
                     continue;
                 }
 
-                result.add(new InLongMsgWrap(head, bodyList));
+                result.add(new InLongMsgWrap(head, bodyList, bodyBytes));
             }
         }
 
@@ -234,4 +222,21 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
     public void setFormatMetricGroup(FormatMetricGroup formatMetricGroup) {
         this.formatMetricGroup = formatMetricGroup;
     }
+
+    protected void checkFieldNameSize(InLongMsgHead head, InLongMsgBody body,
+            int actualNumFields, int fieldNameSize,
+            FailureHandler failureHandler) {
+        if (actualNumFields != fieldNameSize) {
+            if (failureHandler != null) {
+                
failureHandler.onFieldNumError(StringUtils.join(head.getPredefinedFields(), 
","),
+                        body.getDataBytes(), body.getData(),
+                        actualNumFields, fieldNameSize);
+            } else {
+                LOG.warn("The number of fields mismatches: {}"
+                        + ",expected, but was {}. origin text: {}, 
PredefinedFields: {}",
+                        fieldNameSize, actualNumFields, body,
+                        StringUtils.join(head.getPredefinedFields(), ","));
+            }
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 4d491ae018..ed05d104b5 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -150,7 +150,7 @@ public final class InLongMsgBinlogFormatDeserializer 
extends AbstractInLongMsgFo
                 attributesFieldName,
                 metadataFieldName,
                 head.getAttributes(),
-                body.getData(),
+                body.getDataBytes(),
                 includeUpdateBefore,
                 failureHandler);
     }
@@ -163,7 +163,7 @@ public final class InLongMsgBinlogFormatDeserializer 
extends AbstractInLongMsgFo
                 attributesFieldName,
                 metadataFieldName,
                 head.getAttributes(),
-                body.getData(),
+                body.getDataBytes(),
                 includeUpdateBefore,
                 failureHandler);
     }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 78612399ef..f720d8976f 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -103,6 +103,7 @@ public class InLongMsgBinlogUtils {
         return new InLongMsgBody(
                 bytes,
                 null,
+                null,
                 Collections.emptyList(),
                 Collections.emptyMap());
     }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index 5263e1334d..7592e288b6 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -268,11 +268,7 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         List<String> fields = body.getFields();
         int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
                 + (fields == null ? 0 : fields.size());
-        if (needPrint() && actualNumFields != fieldNameSize) {
-            LOG.warn("The number of fields mismatches: expected={}, actual={}. 
" +
-                    "PredefinedFields=[{}], Fields=[{}]", fieldNameSize, 
actualNumFields,
-                    predefinedFields, fields);
-        }
+        checkFieldNameSize(head, body, actualNumFields, fieldNameSize, 
failureHandler);
 
         GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
                 rowFormatInfo,
@@ -300,17 +296,14 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
                 + (fields == null ? 0 : fields.size());
 
-        if (needPrint() && actualNumFields != fieldNameSize) {
-            LOG.warn("The number of fields mismatches: expected={}, actual={}. 
" +
-                    "PredefinedFields=[{}], Fields=[{}]", fieldNameSize, 
actualNumFields,
-                    predefinedFields, fields);
-        }
+        checkFieldNameSize(head, body, actualNumFields, fieldNameSize, 
failureHandler);
 
         FormatMsg formatMsg = InLongMsgCsvUtils.deserializeFormatMsgData(
                 rowFormatInfo,
                 nullLiteral,
-                retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
-                body.getFields(),
+                retainPredefinedField,
+                head,
+                body,
                 converters, failureHandler);
 
         // Decorate result with time and attributes fields if needed
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index 212086a89b..493d707538 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -124,7 +124,8 @@ public class InLongMsgCsvUtils {
                     // Only parsed fields will be used by downstream, so it's 
safe to leave
                     // the other parameters empty.
                     return new InLongMsgBody(
-                            null,
+                            bytes,
+                            bodyStr,
                             null,
                             Arrays.asList(line),
                             Collections.emptyMap());
@@ -134,13 +135,15 @@ public class InLongMsgCsvUtils {
     public static FormatMsg deserializeFormatMsgData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
-            List<String> predefinedFields,
-            List<String> fields,
+            boolean retainPredefinedField,
+            InLongMsgHead head,
+            InLongMsgBody body,
             FieldToRowDataConverters.FieldToRowDataConverter[] converters,
             FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
-
+        List<String> predefinedFields = retainPredefinedField ? 
head.getPredefinedFields() : Collections.emptyList();
+        List<String> fields = body.getFields();
         GenericRowData rowData = new GenericRowData(fieldNames.length);
         long rowDataLength = 0L;
         // Deserialize pre-defined fields
@@ -158,7 +161,7 @@ public class InLongMsgCsvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral, failureHandler));
+                    nullLiteral, head, body, body.getData(), failureHandler));
             rowData.setField(i, field);
             rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
@@ -179,7 +182,7 @@ public class InLongMsgCsvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral, failureHandler));
+                    nullLiteral, head, body, body.getData(), failureHandler));
             rowData.setField(i + predefinedFields.size(), field);
             rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index 7cb030471c..b524f34188 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -254,8 +254,9 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
         GenericRowData genericRowData = InLongMsgKvUtils.deserializeRowData(
                 rowFormatInfo,
                 nullLiteral,
-                retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
-                body.getEntries(),
+                retainPredefinedField,
+                head,
+                body,
                 converters,
                 failureHandler);
 
@@ -272,8 +273,9 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
         FormatMsg formatMsg = InLongMsgKvUtils.deserializeFormatMsgData(
                 rowFormatInfo,
                 nullLiteral,
-                retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
-                body.getEntries(),
+                retainPredefinedField,
+                head,
+                body,
                 converters,
                 failureHandler);
 
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 772e5619d7..f61c18a5ef 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -110,6 +110,7 @@ public class InLongMsgKvUtils {
 
         return list.stream().map((line) -> new InLongMsgBody(
                 bytes,
+                text,
                 null,
                 Collections.emptyList(),
                 line)).collect(Collectors.toList());
@@ -120,20 +121,24 @@ public class InLongMsgKvUtils {
      *
      * @param rowFormatInfo    The format of the fields.
      * @param nullLiteral      The literal for null values.
-     * @param predefinedFields The predefined fields.
-     * @param entries          The entries.
+     * @param retainPredefinedField Whether to retain predefined fields.
+     * @param head The predefined fields.
+     * @param inLongMsgBody          The entries.
      * @return The row deserialized from the given entries.
      */
     public static GenericRowData deserializeRowData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
-            List<String> predefinedFields,
-            Map<String, String> entries,
+            boolean retainPredefinedField,
+            InLongMsgHead head,
+            InLongMsgBody inLongMsgBody,
             FieldToRowDataConverter[] converters,
             FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
+        List<String> predefinedFields = retainPredefinedField ? 
head.getPredefinedFields() : Collections.emptyList();
+        Map<String, String> entries = inLongMsgBody.getEntries();
         GenericRowData row = new GenericRowData(fieldNames.length);
 
         for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -152,7 +157,7 @@ public class InLongMsgKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral, failureHandler));
+                            nullLiteral, head, inLongMsgBody, 
inLongMsgBody.getData(), failureHandler));
             row.setField(i, field);
         }
 
@@ -168,6 +173,7 @@ public class InLongMsgKvUtils {
                     fieldFormatInfo,
                     fieldText,
                     nullLiteral,
+                    head, inLongMsgBody, inLongMsgBody.getData(),
                     failureHandler));
             row.setField(i, field);
         }
@@ -178,13 +184,16 @@ public class InLongMsgKvUtils {
     public static FormatMsg deserializeFormatMsgData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
-            List<String> predefinedFields,
-            Map<String, String> entries,
+            boolean retainPredefinedField,
+            InLongMsgHead head,
+            InLongMsgBody inLongMsgBody,
             FieldToRowDataConverter[] converters,
             FailureHandler failureHandler) throws Exception {
-        String[] fieldNames = rowFormatInfo.getFieldNames();
-        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+        List<String> predefinedFields = retainPredefinedField ? 
head.getPredefinedFields() : Collections.emptyList();
+        Map<String, String> entries = inLongMsgBody.getEntries();
+        String[] fieldNames = rowFormatInfo.getFieldNames();
         GenericRowData row = new GenericRowData(fieldNames.length);
         long rowDataLength = 0L;
         for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -203,7 +212,8 @@ public class InLongMsgKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral, failureHandler));
+                            nullLiteral, head, inLongMsgBody, 
inLongMsgBody.getData(),
+                            failureHandler));
             row.setField(i, field);
             rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
@@ -220,6 +230,7 @@ public class InLongMsgKvUtils {
                     fieldFormatInfo,
                     fieldText,
                     nullLiteral,
+                    head, inLongMsgBody, inLongMsgBody.getData(),
                     failureHandler));
             row.setField(i, field);
             rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
index fbbfb1e7c3..dfc636a79c 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
@@ -93,7 +93,8 @@ public class InLongMsgPbDeserializationSchema implements 
DeserializationSchema<R
         }
     }
 
-    public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out) 
throws Exception {
+    public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out,
+            byte[] originBody) throws Exception {
         byte[] decompressed = decompressor.decompress(message);
         MessageObjs msgObjs = MessageObjs.parseFrom(decompressed);
         List<MessageObj> msgList = msgObjs.getMsgsList();
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index ab095ddaa8..f4f5a8fa24 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -267,10 +267,7 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         List<String> fields = body.getFields();
         int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
                 + (fields == null ? 0 : fields.size());
-        if (needPrint() && actualNumFields != fieldNameSize) {
-            LOG.warn("The number of fields mismatches: " + fieldNameSize +
-                    " expected, but was " + actualNumFields + ".");
-        }
+        checkFieldNameSize(head, body, actualNumFields, fieldNameSize, 
failureHandler);
         GenericRowData dataRow =
                 InLongMsgTlogCsvUtils.deserializeRowData(
                         rowFormatInfo,
@@ -294,18 +291,13 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         List<String> fields = body.getFields();
         int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
                 + (fields == null ? 0 : fields.size());
-
-        if (needPrint() && actualNumFields != fieldNameSize) {
-            LOG.warn("The number of fields mismatches: " + fieldNameSize +
-                    " expected, but was " + actualNumFields + ".");
-        }
-
+        checkFieldNameSize(head, body, actualNumFields, fieldNameSize, 
failureHandler);
         FormatMsg formatMsg =
                 InLongMsgTlogCsvUtils.deserializeFormatMsgData(
                         rowFormatInfo,
                         nullLiteral,
-                        head.getPredefinedFields(),
-                        body.getFields(),
+                        head,
+                        body,
                         converters, failureHandler);
 
         GenericRowData genericRowData = 
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index db4787f83b..60020e4d18 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -109,7 +109,7 @@ public class InLongMsgTlogCsvUtils {
                 for (int j = startIndex; j < segments[i].length; j++) {
                     fields.add(segments[i][j]);
                 }
-                inLongMsgBodies.add(new InLongMsgBody(null, tid, fields, 
Collections.emptyMap()));
+                inLongMsgBodies.add(new InLongMsgBody(bytes, text, tid, 
fields, Collections.emptyMap()));
             }
         }
         return inLongMsgBodies;
@@ -184,13 +184,13 @@ public class InLongMsgTlogCsvUtils {
     }
 
     public static FormatMsg deserializeFormatMsgData(RowFormatInfo 
rowFormatInfo,
-            String nullLiteral,
-            List<String> predefinedFields,
-            List<String> fields,
+            String nullLiteral, InLongMsgHead head, InLongMsgBody 
inLongMsgBody,
             FieldToRowDataConverters.FieldToRowDataConverter[] converters,
             FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+        List<String> predefinedFields = head.getPredefinedFields();
+        List<String> fields = inLongMsgBody.getFields();
 
         GenericRowData rowData = new GenericRowData(fieldNames.length);
         long rowDataLength = 0L;
@@ -211,7 +211,9 @@ public class InLongMsgTlogCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral, failureHandler));
+                            nullLiteral, head, inLongMsgBody,
+                            inLongMsgBody.getData(),
+                            failureHandler));
             rowData.setField(i, field);
             rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
@@ -232,7 +234,8 @@ public class InLongMsgTlogCsvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral, failureHandler));
+                            nullLiteral, head, inLongMsgBody, 
inLongMsgBody.getData(),
+                            failureHandler));
             rowData.setField(i + predefinedFields.size(), field);
             rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index dff67d924f..9fe68bf5fe 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -247,8 +247,8 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
                 InLongMsgTlogKvUtils.deserializeFormatMsgData(
                         rowFormatInfo,
                         nullLiteral,
-                        head.getPredefinedFields(),
-                        body.getEntries(), converters, failureHandler);
+                        head,
+                        body, converters, failureHandler);
 
         RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
                 timeFieldName,
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 57893cbde3..f58eacd881 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -97,10 +97,11 @@ public class InLongMsgTlogKvUtils {
             entries = splitKv(segments[1], entryDelimiter, kvDelimiter, 
escapeChar, quoteChar,
                     lineDelimiter, isDeleteEscapeChar);
             for (Map<String, String> maps : entries) {
-                inLongMsgBodies.add(new InLongMsgBody(null, streamId, 
Collections.emptyList(), maps));
+                inLongMsgBodies.add(new InLongMsgBody(bytes, text, streamId, 
Collections.emptyList(), maps));
             }
         } else {
-            inLongMsgBodies.add(new InLongMsgBody(null, streamId, 
Collections.emptyList(), Collections.emptyMap()));
+            inLongMsgBodies.add(new InLongMsgBody(bytes, text, streamId, 
Collections.emptyList(),
+                    Collections.emptyMap()));
         }
         return inLongMsgBodies;
     }
@@ -141,7 +142,8 @@ public class InLongMsgTlogKvUtils {
                             fieldName,
                             fieldFormatInfo,
                             fieldText,
-                            nullLiteral, failureHandler));
+                            nullLiteral,
+                            failureHandler));
             rowData.setField(i, field);
         }
 
@@ -166,43 +168,45 @@ public class InLongMsgTlogKvUtils {
      *
      * @param rowFormatInfo The format information of the row.
      * @param nullLiteral The literal for null values.
-     * @param predefinedFields The predefined fields.
-     * @param entries The entries.
+     * @param head The predefined fields.
+     * @param inLongMsgBody The entries.
      * @return The row FormatMsg from the given entries.
      */
     public static FormatMsg deserializeFormatMsgData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
-            List<String> predefinedFields,
-            Map<String, String> entries,
+            InLongMsgHead head, InLongMsgBody inLongMsgBody,
             FieldToRowDataConverter[] converters,
             FailureHandler failureHandler) throws Exception {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
+        List<String> predefinedFields = head.getPredefinedFields();
+        Map<String, String> entries = inLongMsgBody.getEntries();
         GenericRowData rowData = new GenericRowData(fieldNames.length);
         long rowDataLength = 0L;
-        for (int i = 0; i < predefinedFields.size(); ++i) {
-
-            if (i >= fieldNames.length) {
-                break;
+        int predefinedFieldLength = predefinedFields == null ? 0 : 
predefinedFields.size();
+        if (predefinedFieldLength > 0) {
+            for (int i = 0; i < predefinedFieldLength; ++i) {
+                if (i >= fieldNames.length) {
+                    break;
+                }
+                String fieldName = fieldNames[i];
+                FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+                String fieldText = predefinedFields.get(i);
+                FieldToRowDataConverter converter = converters[i];
+                Object field = converter.convert(
+                        deserializeBasicField(
+                                fieldName,
+                                fieldFormatInfo,
+                                fieldText,
+                                nullLiteral, head, inLongMsgBody, 
inLongMsgBody.getData(), failureHandler));
+                rowData.setField(i, field);
+                rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldText);
             }
-
-            String fieldName = fieldNames[i];
-            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
-            String fieldText = predefinedFields.get(i);
-            FieldToRowDataConverter converter = converters[i];
-            Object field = converter.convert(
-                    deserializeBasicField(
-                            fieldName,
-                            fieldFormatInfo,
-                            fieldText,
-                            nullLiteral, failureHandler));
-            rowData.setField(i, field);
-            rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldText);
         }
 
-        for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+        for (int i = predefinedFieldLength; i < fieldNames.length; ++i) {
             String fieldName = fieldNames[i];
             FormatInfo fieldFormatInfo = fieldFormatInfos[i];
             String fieldText = entries.get(fieldName);
@@ -211,7 +215,7 @@ public class InLongMsgTlogKvUtils {
                     fieldName,
                     fieldFormatInfo,
                     fieldText,
-                    nullLiteral, failureHandler));
+                    nullLiteral, head, inLongMsgBody, inLongMsgBody.getData(), 
failureHandler));
             rowData.setField(i, field);
             rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldText);
         }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
index c0525efa99..16ceb4549a 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
@@ -137,6 +137,20 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
         return Objects.equals(failureHandler, that.failureHandler);
     }
 
+    protected void checkFieldNameSize(String body, int actualNumFields, int 
fieldNameSize,
+            FailureHandler failureHandler) {
+        if (actualNumFields != fieldNameSize) {
+            if (failureHandler != null) {
+                failureHandler.onFieldNumError(null, null, body,
+                        actualNumFields, fieldNameSize);
+            } else {
+                LOG.warn("The number of fields mismatches: {}"
+                        + ",expected, but was {}. origin text: {}",
+                        fieldNameSize, actualNumFields, body);
+            }
+        }
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(failureHandler);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index c705c4344f..e8353ea0b4 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -255,10 +255,7 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
 
             String[] fieldTexts = splitCsv(text, delimiter, escapeChar, 
quoteChar);
 
-            if (needPrint() && fieldTexts.length != fieldNameSize) {
-                LOG.warn("The number of fields mismatches: expected=[{}], 
actual=[{}]. Text=[{}].",
-                        fieldNames.length, fieldTexts.length, text);
-            }
+            checkFieldNameSize(text, fieldTexts.length, fieldNameSize, 
failureHandler);
 
             GenericRowData rowData = new GenericRowData(fieldNames.length);
 
@@ -271,7 +268,7 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
                                     fieldNames[i],
                                     fieldFormatInfos[i],
                                     fieldTexts[i],
-                                    nullLiteral, failureHandler);
+                                    nullLiteral, null, null, text, 
failureHandler);
 
                     rowData.setField(i, converters[i].convert(field));
                 }
@@ -297,10 +294,7 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
 
             String[] fieldTexts = splitCsv(text, delimiter, escapeChar, 
quoteChar);
 
-            if (needPrint() && fieldTexts.length != fieldNameSize) {
-                LOG.warn("The number of fields mismatches: expected=[{}], 
actual=[{}]. Text=[{}].",
-                        fieldNames.length, fieldTexts.length, text);
-            }
+            checkFieldNameSize(text, fieldTexts.length, fieldNameSize, 
failureHandler);
 
             GenericRowData rowData = new GenericRowData(fieldNames.length);
 
@@ -313,7 +307,7 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
                                     fieldNames[i],
                                     fieldFormatInfos[i],
                                     fieldTexts[i],
-                                    nullLiteral, failureHandler);
+                                    nullLiteral, null, null, text, 
failureHandler);
 
                     rowData.setField(i, converters[i].convert(field));
                     rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldTexts[i]);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index 200719d145..e96e31cde2 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -202,7 +202,7 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
                         fieldName,
                         fieldFormatInfo,
                         fieldText,
-                        nullLiteral, failureHandler);
+                        nullLiteral, null, null, text, failureHandler);
                 rowData.setField(i, converters[i].convert(field));
             }
             return rowData;
@@ -237,7 +237,7 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
                         fieldName,
                         fieldFormatInfo,
                         fieldText,
-                        nullLiteral, failureHandler);
+                        nullLiteral, null, null, text, failureHandler);
                 rowData.setField(i, converters[i].convert(field));
                 rowDataLength += getFormatValueLength(fieldFormatInfo, 
fieldText);
             }

Reply via email to