This is an automated email from the ASF dual-hosted git repository.
baomingyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e101491385 [INLONG-12065][Sort] Sort Format supports outputting
complete row information when errors occur in field parsing (#12066)
e101491385 is described below
commit e101491385be810ea49d1f9842b6ee56cd74ac1c
Author: Mingyu Bao <[email protected]>
AuthorDate: Fri Jan 16 14:12:15 2026 +0800
[INLONG-12065][Sort] Sort Format supports outputting complete row
information when errors occur in field parsing (#12066)
---
.../inlong/sort/formats/base/TableFormatUtils.java | 72 ++++++++++++++--------
.../sort/formats/inlongmsg/FailureHandler.java | 30 +++++++++
.../sort/formats/inlongmsg/InLongMsgBody.java | 42 ++++++-------
.../sort/formats/inlongmsg/InLongMsgWrap.java | 17 +++--
.../InLongMsgBinlogFormatDeserializer.java | 2 +-
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 1 +
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java | 3 +-
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 1 +
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 2 +-
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 2 +-
.../AbstractInLongMsgFormatDeserializer.java | 31 ++++++----
.../InLongMsgBinlogFormatDeserializer.java | 4 +-
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 1 +
.../InLongMsgCsvFormatDeserializer.java | 17 ++---
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java | 15 +++--
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 10 +--
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 31 +++++++---
.../InLongMsgPbDeserializationSchema.java | 3 +-
.../InLongMsgTlogCsvFormatDeserializer.java | 16 ++---
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 15 +++--
.../InLongMsgTlogKvFormatDeserializer.java | 4 +-
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 56 +++++++++--------
.../formats/base/DefaultDeserializationSchema.java | 14 +++++
.../csv/CsvRowDataDeserializationSchema.java | 14 ++---
.../formats/kv/KvRowDataDeserializationSchema.java | 4 +-
25 files changed, 243 insertions(+), 164 deletions(-)
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index dd1d3e1579..32adac5559 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -59,8 +59,9 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.TypeInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
@@ -551,6 +552,22 @@ public class TableFormatUtils {
String fieldText,
String nullLiteral,
FailureHandler failureHandler) throws Exception {
+ return deserializeBasicField(fieldName, fieldFormatInfo, fieldText,
nullLiteral,
+ null, null, null, failureHandler);
+ }
+
+ /**
+ * Deserializes the basic field.
+ */
+ public static Object deserializeBasicField(
+ String fieldName,
+ FormatInfo fieldFormatInfo,
+ String fieldText,
+ String nullLiteral,
+ InLongMsgHead head,
+ InLongMsgBody inLongMsgBody,
+ String originBody,
+ FailureHandler failureHandler) throws Exception {
checkState(fieldFormatInfo instanceof BasicFormatInfo);
if (fieldText == null) {
@@ -574,42 +591,49 @@ public class TableFormatUtils {
try {
return ((BasicFormatInfo<?>)
fieldFormatInfo).deserialize(fieldText);
} catch (Exception e) {
- LOG.warn("Could not properly deserialize the " + "text "
- + fieldText + " for field " + fieldName + ".", e);
if (failureHandler != null) {
- failureHandler.onConvertingFieldFailure(fieldName, fieldText,
fieldFormatInfo, e);
+ failureHandler.onConvertingFieldFailure(fieldName, fieldText,
fieldFormatInfo,
+ head, inLongMsgBody, originBody, e);
+ } else {
+ LOG.warn("Could not properly deserialize the" + "text: {},for
field:{}"
+ + ". predefinedFields = {},fields = {}, attr={},
originBody={}",
+ fieldText, fieldName,
+ head == null ? "" : head.getPredefinedFields(),
+ inLongMsgBody == null ? "" : inLongMsgBody.getFields(),
+ head == null ? "" : head.getAttributes(),
+ originBody == null ? (inLongMsgBody == null ? "" : new
String(inLongMsgBody.getDataBytes()))
+ : originBody,
+ e);
}
}
return null;
}
public static long getFormatValueLength(FormatInfo fieldFormatInfo, String
fieldText) {
- if (fieldFormatInfo instanceof BooleanFormatInfo) {
- return 4;
- } else if (fieldFormatInfo instanceof ByteFormatInfo) {
- return 4;
- } else if (fieldFormatInfo instanceof BooleanFormatInfo) {
- return 4;
- } else if (fieldFormatInfo instanceof ShortFormatInfo) {
- return 4;
- } else if (fieldFormatInfo instanceof IntFormatInfo) {
- return 4;
+ if (fieldFormatInfo instanceof StringFormatInfo) {
+ return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
} else if (fieldFormatInfo instanceof LongFormatInfo) {
- return 8;
- } else if (fieldFormatInfo instanceof FloatFormatInfo) {
- return 8;
+ return 24;
+ } else if (fieldFormatInfo instanceof IntFormatInfo) {
+ return 16;
} else if (fieldFormatInfo instanceof DoubleFormatInfo) {
- return 8;
- } else if (fieldFormatInfo instanceof DecimalFormatInfo) {
- return 8;
+ return 24;
+ } else if (fieldFormatInfo instanceof FloatFormatInfo) {
+ return 16;
} else if (fieldFormatInfo instanceof DateFormatInfo
|| fieldFormatInfo instanceof TimeFormatInfo
|| fieldFormatInfo instanceof TimestampFormatInfo) {
- return 8;
- } else if (StringUtils.isNotEmpty(fieldText)) {
- return fieldText.length();
+ return 24;
+ } else if (fieldFormatInfo instanceof BooleanFormatInfo) {
+ return 16;
+ } else if (fieldFormatInfo instanceof ByteFormatInfo) {
+ return 16;
+ } else if (fieldFormatInfo instanceof ShortFormatInfo) {
+ return 16;
+ } else if (fieldFormatInfo instanceof DecimalFormatInfo) {
+ return 24;
}
- return 0L;
+ return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
}
/**
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
index e07b9fc815..b7afcb9797 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
@@ -85,4 +85,34 @@ public interface FailureHandler extends Serializable {
void onConvertingFieldFailure(String fieldName, String fieldText,
FormatInfo formatInfo,
Exception exception) throws Exception;
+ /**
+ * This method is called when there is a failure occurred while converting
any field to row.
+ *
+ * @param fieldName the filed name
+ * @param fieldText the filed test
+ * @param formatInfo the filed target type info
+ * @param exception the thrown exception
+ * @param head the predefined fields
+ * @param inLongMsgBody the fields
+ * @param originBody the origin body
+ * @throws Exception the exception
+ */
+ default void onConvertingFieldFailure(String fieldName, String fieldText,
FormatInfo formatInfo,
+ InLongMsgHead head, InLongMsgBody inLongMsgBody, String originBody,
+ Exception exception) throws Exception {
+ onConvertingFieldFailure(fieldName, fieldText, formatInfo, exception);
+ }
+
+ /**
+ * This method is called when there is a failure occurred while field num
error.
+ *
+ * @param predefinedFields predefined fields
+ * @param originBodyBytes origin body bytes
+ * @param originBody origin body
+ * @param actualNumFields actual number of fields
+ * @param fieldNameSize expected number of fields
+ */
+ default void onFieldNumError(String predefinedFields, byte[]
originBodyBytes, String originBody,
+ int actualNumFields, int fieldNameSize) {
+ }
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
index 77a7812b22..f25a62ea9c 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -25,6 +28,7 @@ import java.util.Map;
/**
* The body deserialized from {@link InLongMsgBody}.
*/
+@Data
public class InLongMsgBody implements Serializable {
private static final long serialVersionUID = 1L;
@@ -32,7 +36,12 @@ public class InLongMsgBody implements Serializable {
/**
* The body of the record.
*/
- private final byte[] data;
+ private final byte[] dataBytes;
+
+ /**
+ * The body of the record.
+ */
+ private final String data;
/**
* The interface of the record.
@@ -50,32 +59,18 @@ public class InLongMsgBody implements Serializable {
private final Map<String, String> entries;
public InLongMsgBody(
- byte[] data,
+ byte[] dataBytes,
+ String data,
String streamId,
List<String> fields,
Map<String, String> entries) {
+ this.dataBytes = dataBytes;
this.data = data;
this.streamId = streamId;
this.fields = fields;
this.entries = entries;
}
- public byte[] getData() {
- return data;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public List<String> getFields() {
- return fields;
- }
-
- public Map<String, String> getEntries() {
- return entries;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -87,17 +82,22 @@ public class InLongMsgBody implements Serializable {
}
InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
- return Arrays.equals(data, inLongMsgBody.data);
+ return StringUtils.equals(data, inLongMsgBody.data)
+ && Arrays.equals(dataBytes, inLongMsgBody.dataBytes);
}
@Override
public int hashCode() {
- return Arrays.hashCode(data);
+ if (dataBytes != null) {
+ return Arrays.hashCode(dataBytes);
+ }
+ return data == null ? super.hashCode() : data.hashCode();
}
@Override
public String toString() {
- return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ",
streamId='" + streamId + '\''
+ return "InLongMsgBody{" + "data=" + (data == null ? new
String(dataBytes) : data)
+ + ", streamId='" + streamId + '\''
+ ", fields=" + fields + ", entries=" + entries + '}';
}
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
index fba2731142..ddb88d01e5 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java
@@ -17,28 +17,27 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.List;
/**
* The body deserialized from {@link InLongMsgWrap}.
*/
+@Data
public class InLongMsgWrap implements Serializable {
private final InLongMsgHead inLongMsgHead;
private final List<InLongMsgBody> inLongMsgBodyList;
- public InLongMsgWrap(InLongMsgHead inLongMsgHead, List<InLongMsgBody>
inLongMsgBodyList) {
+ private final byte[] originBody;
+
+ public InLongMsgWrap(InLongMsgHead inLongMsgHead,
+ List<InLongMsgBody> inLongMsgBodyList, byte[] originBody) {
this.inLongMsgHead = inLongMsgHead;
this.inLongMsgBodyList = inLongMsgBodyList;
- }
-
- public InLongMsgHead getInLongMsgHead() {
- return inLongMsgHead;
- }
-
- public List<InLongMsgBody> getInLongMsgBodyList() {
- return inLongMsgBodyList;
+ this.originBody = originBody;
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 9cb52d3d07..a0ec713e1c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -158,7 +158,7 @@ public final class InLongMsgBinlogFormatDeserializer
extends AbstractInLongMsgFo
attributesFieldName,
metadataFieldName,
head.getAttributes(),
- body.getData(),
+ body.getDataBytes(),
includeUpdateBefore);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 4eb3a042f6..546cac181a 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -138,6 +138,7 @@ public class InLongMsgBinlogUtils {
return new InLongMsgBody(
bytes,
null,
+ null,
Collections.emptyList(),
Collections.emptyMap());
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index e4e4e5659e..8a6297dbbd 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -116,7 +116,8 @@ public class InLongMsgCsvUtils {
// Only parsed fields will be used by downstream, so it's
safe to leave
// the other parameters empty.
return new InLongMsgBody(
- null,
+ bytes,
+ bodyStr,
null,
Arrays.asList(line),
Collections.emptyMap());
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index d8414ca038..f9362a5db8 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -106,6 +106,7 @@ public class InLongMsgKvUtils {
return list.stream().map((line) -> {
return new InLongMsgBody(
bytes,
+ text,
null,
Collections.emptyList(),
line);
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 82bc8e2ebc..b17c54bb63 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -94,7 +94,7 @@ public class InLongMsgTlogCsvUtils {
List<String> fields =
Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1),
segments.length).collect(Collectors.toList());
- return new InLongMsgBody(bytes, streamId, fields,
Collections.emptyMap());
+ return new InLongMsgBody(bytes, text, streamId, fields,
Collections.emptyMap());
}
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 6982b5bea0..60689f17c3 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -94,7 +94,7 @@ public class InLongMsgTlogKvUtils {
entries = Collections.emptyMap();
}
- return new InLongMsgBody(bytes, streamId, Collections.emptyList(),
entries);
+ return new InLongMsgBody(bytes, text, streamId,
Collections.emptyList(), entries);
}
/**
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index 43c937a5a7..1b4283b6d7 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -33,7 +33,6 @@ import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -47,8 +46,6 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
private static final Logger LOG =
LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
- protected long lastPrintTimestamp = 0L;
- protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
protected int fieldNameSize = 0;
protected FailureHandler failureHandler;
@@ -84,15 +81,6 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
protected abstract List<FormatMsg> convertFormatMsgList(InLongMsgHead
head, InLongMsgBody body) throws Exception;
- protected boolean needPrint() {
- long now = Instant.now().toEpochMilli();
- if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
- lastPrintTimestamp = now;
- return true;
- }
- return false;
- }
-
public void flatMap(
byte[] bytes,
Collector<RowData> collector) throws Exception {
@@ -160,7 +148,7 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
continue;
}
- result.add(new InLongMsgWrap(head, bodyList));
+ result.add(new InLongMsgWrap(head, bodyList, bodyBytes));
}
}
@@ -234,4 +222,21 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
public void setFormatMetricGroup(FormatMetricGroup formatMetricGroup) {
this.formatMetricGroup = formatMetricGroup;
}
+
+ protected void checkFieldNameSize(InLongMsgHead head, InLongMsgBody body,
+ int actualNumFields, int fieldNameSize,
+ FailureHandler failureHandler) {
+ if (actualNumFields != fieldNameSize) {
+ if (failureHandler != null) {
+
failureHandler.onFieldNumError(StringUtils.join(head.getPredefinedFields(),
","),
+ body.getDataBytes(), body.getData(),
+ actualNumFields, fieldNameSize);
+ } else {
+ LOG.warn("The number of fields mismatches: {}"
+ + ",expected, but was {}. origin text: {},
PredefinedFields: {}",
+ fieldNameSize, actualNumFields, body,
+ StringUtils.join(head.getPredefinedFields(), ","));
+ }
+ }
+ }
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 4d491ae018..ed05d104b5 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -150,7 +150,7 @@ public final class InLongMsgBinlogFormatDeserializer
extends AbstractInLongMsgFo
attributesFieldName,
metadataFieldName,
head.getAttributes(),
- body.getData(),
+ body.getDataBytes(),
includeUpdateBefore,
failureHandler);
}
@@ -163,7 +163,7 @@ public final class InLongMsgBinlogFormatDeserializer
extends AbstractInLongMsgFo
attributesFieldName,
metadataFieldName,
head.getAttributes(),
- body.getData(),
+ body.getDataBytes(),
includeUpdateBefore,
failureHandler);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 78612399ef..f720d8976f 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -103,6 +103,7 @@ public class InLongMsgBinlogUtils {
return new InLongMsgBody(
bytes,
null,
+ null,
Collections.emptyList(),
Collections.emptyMap());
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index 5263e1334d..7592e288b6 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -268,11 +268,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
List<String> fields = body.getFields();
int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ (fields == null ? 0 : fields.size());
- if (needPrint() && actualNumFields != fieldNameSize) {
- LOG.warn("The number of fields mismatches: expected={}, actual={}.
" +
- "PredefinedFields=[{}], Fields=[{}]", fieldNameSize,
actualNumFields,
- predefinedFields, fields);
- }
+ checkFieldNameSize(head, body, actualNumFields, fieldNameSize,
failureHandler);
GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
rowFormatInfo,
@@ -300,17 +296,14 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ (fields == null ? 0 : fields.size());
- if (needPrint() && actualNumFields != fieldNameSize) {
- LOG.warn("The number of fields mismatches: expected={}, actual={}.
" +
- "PredefinedFields=[{}], Fields=[{}]", fieldNameSize,
actualNumFields,
- predefinedFields, fields);
- }
+ checkFieldNameSize(head, body, actualNumFields, fieldNameSize,
failureHandler);
FormatMsg formatMsg = InLongMsgCsvUtils.deserializeFormatMsgData(
rowFormatInfo,
nullLiteral,
- retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
- body.getFields(),
+ retainPredefinedField,
+ head,
+ body,
converters, failureHandler);
// Decorate result with time and attributes fields if needed
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index 212086a89b..493d707538 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -124,7 +124,8 @@ public class InLongMsgCsvUtils {
// Only parsed fields will be used by downstream, so it's
safe to leave
// the other parameters empty.
return new InLongMsgBody(
- null,
+ bytes,
+ bodyStr,
null,
Arrays.asList(line),
Collections.emptyMap());
@@ -134,13 +135,15 @@ public class InLongMsgCsvUtils {
public static FormatMsg deserializeFormatMsgData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
- List<String> predefinedFields,
- List<String> fields,
+ boolean retainPredefinedField,
+ InLongMsgHead head,
+ InLongMsgBody body,
FieldToRowDataConverters.FieldToRowDataConverter[] converters,
FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
-
+ List<String> predefinedFields = retainPredefinedField ?
head.getPredefinedFields() : Collections.emptyList();
+ List<String> fields = body.getFields();
GenericRowData rowData = new GenericRowData(fieldNames.length);
long rowDataLength = 0L;
// Deserialize pre-defined fields
@@ -158,7 +161,7 @@ public class InLongMsgCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, body, body.getData(), failureHandler));
rowData.setField(i, field);
rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
@@ -179,7 +182,7 @@ public class InLongMsgCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, body, body.getData(), failureHandler));
rowData.setField(i + predefinedFields.size(), field);
rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index 7cb030471c..b524f34188 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -254,8 +254,9 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
GenericRowData genericRowData = InLongMsgKvUtils.deserializeRowData(
rowFormatInfo,
nullLiteral,
- retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
- body.getEntries(),
+ retainPredefinedField,
+ head,
+ body,
converters,
failureHandler);
@@ -272,8 +273,9 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
FormatMsg formatMsg = InLongMsgKvUtils.deserializeFormatMsgData(
rowFormatInfo,
nullLiteral,
- retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
- body.getEntries(),
+ retainPredefinedField,
+ head,
+ body,
converters,
failureHandler);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 772e5619d7..f61c18a5ef 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -110,6 +110,7 @@ public class InLongMsgKvUtils {
return list.stream().map((line) -> new InLongMsgBody(
bytes,
+ text,
null,
Collections.emptyList(),
line)).collect(Collectors.toList());
@@ -120,20 +121,24 @@ public class InLongMsgKvUtils {
*
* @param rowFormatInfo The format of the fields.
* @param nullLiteral The literal for null values.
- * @param predefinedFields The predefined fields.
- * @param entries The entries.
+ * @param retainPredefinedField Whether to retain predefined fields.
+ * @param head The predefined fields.
+ * @param inLongMsgBody The entries.
* @return The row deserialized from the given entries.
*/
public static GenericRowData deserializeRowData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
- List<String> predefinedFields,
- Map<String, String> entries,
+ boolean retainPredefinedField,
+ InLongMsgHead head,
+ InLongMsgBody inLongMsgBody,
FieldToRowDataConverter[] converters,
FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+ List<String> predefinedFields = retainPredefinedField ?
head.getPredefinedFields() : Collections.emptyList();
+ Map<String, String> entries = inLongMsgBody.getEntries();
GenericRowData row = new GenericRowData(fieldNames.length);
for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -152,7 +157,7 @@ public class InLongMsgKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, inLongMsgBody,
inLongMsgBody.getData(), failureHandler));
row.setField(i, field);
}
@@ -168,6 +173,7 @@ public class InLongMsgKvUtils {
fieldFormatInfo,
fieldText,
nullLiteral,
+ head, inLongMsgBody, inLongMsgBody.getData(),
failureHandler));
row.setField(i, field);
}
@@ -178,13 +184,16 @@ public class InLongMsgKvUtils {
public static FormatMsg deserializeFormatMsgData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
- List<String> predefinedFields,
- Map<String, String> entries,
+ boolean retainPredefinedField,
+ InLongMsgHead head,
+ InLongMsgBody inLongMsgBody,
FieldToRowDataConverter[] converters,
FailureHandler failureHandler) throws Exception {
- String[] fieldNames = rowFormatInfo.getFieldNames();
- FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+ List<String> predefinedFields = retainPredefinedField ?
head.getPredefinedFields() : Collections.emptyList();
+ Map<String, String> entries = inLongMsgBody.getEntries();
+ String[] fieldNames = rowFormatInfo.getFieldNames();
GenericRowData row = new GenericRowData(fieldNames.length);
long rowDataLength = 0L;
for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -203,7 +212,8 @@ public class InLongMsgKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, inLongMsgBody,
inLongMsgBody.getData(),
+ failureHandler));
row.setField(i, field);
rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
@@ -220,6 +230,7 @@ public class InLongMsgKvUtils {
fieldFormatInfo,
fieldText,
nullLiteral,
+ head, inLongMsgBody, inLongMsgBody.getData(),
failureHandler));
row.setField(i, field);
rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
index fbbfb1e7c3..dfc636a79c 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
@@ -93,7 +93,8 @@ public class InLongMsgPbDeserializationSchema implements
DeserializationSchema<R
}
}
- public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out)
throws Exception {
+ public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out,
+ byte[] originBody) throws Exception {
byte[] decompressed = decompressor.decompress(message);
MessageObjs msgObjs = MessageObjs.parseFrom(decompressed);
List<MessageObj> msgList = msgObjs.getMsgsList();
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index ab095ddaa8..f4f5a8fa24 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -267,10 +267,7 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
List<String> fields = body.getFields();
int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ (fields == null ? 0 : fields.size());
- if (needPrint() && actualNumFields != fieldNameSize) {
- LOG.warn("The number of fields mismatches: " + fieldNameSize +
- " expected, but was " + actualNumFields + ".");
- }
+ checkFieldNameSize(head, body, actualNumFields, fieldNameSize,
failureHandler);
GenericRowData dataRow =
InLongMsgTlogCsvUtils.deserializeRowData(
rowFormatInfo,
@@ -294,18 +291,13 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
List<String> fields = body.getFields();
int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ (fields == null ? 0 : fields.size());
-
- if (needPrint() && actualNumFields != fieldNameSize) {
- LOG.warn("The number of fields mismatches: " + fieldNameSize +
- " expected, but was " + actualNumFields + ".");
- }
-
+ checkFieldNameSize(head, body, actualNumFields, fieldNameSize,
failureHandler);
FormatMsg formatMsg =
InLongMsgTlogCsvUtils.deserializeFormatMsgData(
rowFormatInfo,
nullLiteral,
- head.getPredefinedFields(),
- body.getFields(),
+ head,
+ body,
converters, failureHandler);
GenericRowData genericRowData =
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index db4787f83b..60020e4d18 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -109,7 +109,7 @@ public class InLongMsgTlogCsvUtils {
for (int j = startIndex; j < segments[i].length; j++) {
fields.add(segments[i][j]);
}
- inLongMsgBodies.add(new InLongMsgBody(null, tid, fields,
Collections.emptyMap()));
+ inLongMsgBodies.add(new InLongMsgBody(bytes, text, tid,
fields, Collections.emptyMap()));
}
}
return inLongMsgBodies;
@@ -184,13 +184,13 @@ public class InLongMsgTlogCsvUtils {
}
public static FormatMsg deserializeFormatMsgData(RowFormatInfo
rowFormatInfo,
- String nullLiteral,
- List<String> predefinedFields,
- List<String> fields,
+ String nullLiteral, InLongMsgHead head, InLongMsgBody
inLongMsgBody,
FieldToRowDataConverters.FieldToRowDataConverter[] converters,
FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+ List<String> predefinedFields = head.getPredefinedFields();
+ List<String> fields = inLongMsgBody.getFields();
GenericRowData rowData = new GenericRowData(fieldNames.length);
long rowDataLength = 0L;
@@ -211,7 +211,9 @@ public class InLongMsgTlogCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, inLongMsgBody,
+ inLongMsgBody.getData(),
+ failureHandler));
rowData.setField(i, field);
rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
@@ -232,7 +234,8 @@ public class InLongMsgTlogCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, inLongMsgBody,
inLongMsgBody.getData(),
+ failureHandler));
rowData.setField(i + predefinedFields.size(), field);
rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index dff67d924f..9fe68bf5fe 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -247,8 +247,8 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
InLongMsgTlogKvUtils.deserializeFormatMsgData(
rowFormatInfo,
nullLiteral,
- head.getPredefinedFields(),
- body.getEntries(), converters, failureHandler);
+ head,
+ body, converters, failureHandler);
RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
timeFieldName,
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 57893cbde3..f58eacd881 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -97,10 +97,11 @@ public class InLongMsgTlogKvUtils {
entries = splitKv(segments[1], entryDelimiter, kvDelimiter,
escapeChar, quoteChar,
lineDelimiter, isDeleteEscapeChar);
for (Map<String, String> maps : entries) {
- inLongMsgBodies.add(new InLongMsgBody(null, streamId,
Collections.emptyList(), maps));
+ inLongMsgBodies.add(new InLongMsgBody(bytes, text, streamId,
Collections.emptyList(), maps));
}
} else {
- inLongMsgBodies.add(new InLongMsgBody(null, streamId,
Collections.emptyList(), Collections.emptyMap()));
+ inLongMsgBodies.add(new InLongMsgBody(bytes, text, streamId,
Collections.emptyList(),
+ Collections.emptyMap()));
}
return inLongMsgBodies;
}
@@ -141,7 +142,8 @@ public class InLongMsgTlogKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral,
+ failureHandler));
rowData.setField(i, field);
}
@@ -166,43 +168,45 @@ public class InLongMsgTlogKvUtils {
*
* @param rowFormatInfo The format information of the row.
* @param nullLiteral The literal for null values.
- * @param predefinedFields The predefined fields.
- * @param entries The entries.
+ * @param head The predefined fields.
+ * @param inLongMsgBody The entries.
* @return The row FormatMsg from the given entries.
*/
public static FormatMsg deserializeFormatMsgData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
- List<String> predefinedFields,
- Map<String, String> entries,
+ InLongMsgHead head, InLongMsgBody inLongMsgBody,
FieldToRowDataConverter[] converters,
FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+ List<String> predefinedFields = head.getPredefinedFields();
+ Map<String, String> entries = inLongMsgBody.getEntries();
GenericRowData rowData = new GenericRowData(fieldNames.length);
long rowDataLength = 0L;
- for (int i = 0; i < predefinedFields.size(); ++i) {
-
- if (i >= fieldNames.length) {
- break;
+ int predefinedFieldLength = predefinedFields == null ? 0 :
predefinedFields.size();
+ if (predefinedFieldLength > 0) {
+ for (int i = 0; i < predefinedFieldLength; ++i) {
+ if (i >= fieldNames.length) {
+ break;
+ }
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ String fieldText = predefinedFields.get(i);
+ FieldToRowDataConverter converter = converters[i];
+ Object field = converter.convert(
+ deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, head, inLongMsgBody,
inLongMsgBody.getData(), failureHandler));
+ rowData.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldText);
}
-
- String fieldName = fieldNames[i];
- FormatInfo fieldFormatInfo = fieldFormatInfos[i];
- String fieldText = predefinedFields.get(i);
- FieldToRowDataConverter converter = converters[i];
- Object field = converter.convert(
- deserializeBasicField(
- fieldName,
- fieldFormatInfo,
- fieldText,
- nullLiteral, failureHandler));
- rowData.setField(i, field);
- rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldText);
}
- for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+ for (int i = predefinedFieldLength; i < fieldNames.length; ++i) {
String fieldName = fieldNames[i];
FormatInfo fieldFormatInfo = fieldFormatInfos[i];
String fieldText = entries.get(fieldName);
@@ -211,7 +215,7 @@ public class InLongMsgTlogKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler));
+ nullLiteral, head, inLongMsgBody, inLongMsgBody.getData(),
failureHandler));
rowData.setField(i, field);
rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldText);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
index c0525efa99..16ceb4549a 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
@@ -137,6 +137,20 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
return Objects.equals(failureHandler, that.failureHandler);
}
+ protected void checkFieldNameSize(String body, int actualNumFields, int
fieldNameSize,
+ FailureHandler failureHandler) {
+ if (actualNumFields != fieldNameSize) {
+ if (failureHandler != null) {
+ failureHandler.onFieldNumError(null, null, body,
+ actualNumFields, fieldNameSize);
+ } else {
+ LOG.warn("The number of fields mismatches: {}"
+ + ",expected, but was {}. origin text: {}",
+ fieldNameSize, actualNumFields, body);
+ }
+ }
+ }
+
@Override
public int hashCode() {
return Objects.hash(failureHandler);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index c705c4344f..e8353ea0b4 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -255,10 +255,7 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
String[] fieldTexts = splitCsv(text, delimiter, escapeChar,
quoteChar);
- if (needPrint() && fieldTexts.length != fieldNameSize) {
- LOG.warn("The number of fields mismatches: expected=[{}],
actual=[{}]. Text=[{}].",
- fieldNames.length, fieldTexts.length, text);
- }
+ checkFieldNameSize(text, fieldTexts.length, fieldNameSize,
failureHandler);
GenericRowData rowData = new GenericRowData(fieldNames.length);
@@ -271,7 +268,7 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
fieldNames[i],
fieldFormatInfos[i],
fieldTexts[i],
- nullLiteral, failureHandler);
+ nullLiteral, null, null, text,
failureHandler);
rowData.setField(i, converters[i].convert(field));
}
@@ -297,10 +294,7 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
String[] fieldTexts = splitCsv(text, delimiter, escapeChar,
quoteChar);
- if (needPrint() && fieldTexts.length != fieldNameSize) {
- LOG.warn("The number of fields mismatches: expected=[{}],
actual=[{}]. Text=[{}].",
- fieldNames.length, fieldTexts.length, text);
- }
+ checkFieldNameSize(text, fieldTexts.length, fieldNameSize,
failureHandler);
GenericRowData rowData = new GenericRowData(fieldNames.length);
@@ -313,7 +307,7 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
fieldNames[i],
fieldFormatInfos[i],
fieldTexts[i],
- nullLiteral, failureHandler);
+ nullLiteral, null, null, text,
failureHandler);
rowData.setField(i, converters[i].convert(field));
rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldTexts[i]);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index 200719d145..e96e31cde2 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -202,7 +202,7 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler);
+ nullLiteral, null, null, text, failureHandler);
rowData.setField(i, converters[i].convert(field));
}
return rowData;
@@ -237,7 +237,7 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral, failureHandler);
+ nullLiteral, null, null, text, failureHandler);
rowData.setField(i, converters[i].convert(field));
rowDataLength += getFormatValueLength(fieldFormatInfo,
fieldText);
}