This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5a58040909 [INLONG-9791][Sort] Return null instead of throwing an
exception when deserialization by type fails (#9792)
5a58040909 is described below
commit 5a58040909423aebd25a2bf40df80a23480bf047
Author: baomingyu <[email protected]>
AuthorDate: Tue Mar 12 11:22:30 2024 +0800
[INLONG-9791][Sort] Return null instead of throwing an exception when
deserialization by type fails (#9792)
---
.../inlong/sort/formats/base/TableFormatUtils.java | 9 ++++--
.../sort/formats/inlongmsg/FailureHandler.java | 20 ++++++++++++
.../formats/inlongmsg/IgnoreFailureHandler.java | 10 ++++++
.../sort/formats/inlongmsg/NoOpFailureHandler.java | 11 +++++++
.../sort/formats/base/TableFormatUtilsTest.java | 10 +++---
.../formats/csv/CsvDeserializationSchemaTest.java | 11 ++++---
.../InLongMsgCsvFormatDeserializerTest.java | 20 +++++++++---
.../InLongMsgKvFormatDeserializerTest.java | 2 +-
.../InLongMsgTlogCsvFormatDeserializerTest.java | 2 +-
.../InLongMsgTlogKvFormatDeserializerTest.java | 2 +-
.../formats/kv/KvDeserializationSchemaTest.java | 2 +-
.../InLongMsgCsvFormatDeserializerTest.java | 35 +++++++++++++-------
.../InLongMsgKvFormatDeserializerTest.java | 2 +-
.../InLongMsgTlogCsvFormatDeserializerTest.java | 2 +-
.../InLongMsgTlogKvFormatDeserializerTest.java | 2 +-
.../formats/base/DefaultDeserializationSchema.java | 37 ++++++++++------------
.../sort/formats/base/TableFormatUtilsTest.java | 10 +++---
.../csv/CsvRowDataDeserializationSchema.java | 8 ++---
.../csv/CsvRowDataDeserializationSchemaTest.java | 9 ++++--
.../json/JsonRowDataDeserializationSchema.java | 12 +++++--
.../formats/json/FieldToRowDataConvertersTest.java | 2 ++
.../json/JsonRowDataDeserializationSchemaTest.java | 4 ---
.../formats/json/JsonRowDataSerDeTestBase.java | 2 ++
.../formats/json/RowDataToFieldConvertersTest.java | 6 ++--
.../formats/kv/KvRowDataDeserializationSchema.java | 10 +++---
.../kv/KvRowDataDeserializationSchemaTest.java | 2 +-
26 files changed, 162 insertions(+), 80 deletions(-)
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index d83108de39..430b89b32f 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -92,6 +92,8 @@ import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.math.BigDecimal;
@@ -111,6 +113,8 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SC
*/
public class TableFormatUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(TableFormatUtils.class);
+
/**
* Returns the {@link DeserializationSchema} described by the given
* properties.
@@ -567,9 +571,10 @@ public class TableFormatUtils {
try {
return ((BasicFormatInfo<?>)
fieldFormatInfo).deserialize(fieldText);
} catch (Exception e) {
- throw new RuntimeException("Could not properly deserialize the "
- + "text " + fieldText + " for field " + fieldName + ".",
e);
+ LOG.warn("Could not properly deserialize the " + "text "
+ + fieldText + " for field " + fieldName + ".", e);
}
+ return null;
}
/**
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
index 7dff44fa10..6bdbf36103 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
@@ -24,6 +24,25 @@ import java.io.Serializable;
*/
public interface FailureHandler extends Serializable {
+ /**
+ * This method is called when there is a failure occurred while parsing to
check is
+ * or not parse failure.
+ *
+ */
+ default boolean isIgnoreFailure() {
+ return false;
+ };
+
+ /**
+ * This method is called when there is a failure occurred while parsing
non-InLong message.
+ *
+ * @param msg the msg byte
+ * @param exception the thrown exception
+ * @throws Exception the exception
+ */
+ default void onParsingMsgFailure(Object msg, Exception exception) throws
Exception {
+ };
+
/**
* This method is called when there is a failure occurred while parsing
InLongMsg head.
*
@@ -51,4 +70,5 @@ public interface FailureHandler extends Serializable {
* @throws Exception the exception
*/
void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body,
Exception exception) throws Exception;
+
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
index 6ac0b956f2..b27fefc891 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
@@ -27,6 +27,11 @@ public class IgnoreFailureHandler implements FailureHandler {
private static final Logger LOG =
LoggerFactory.getLogger(IgnoreFailureHandler.class);
+ @Override
+ public void onParsingMsgFailure(Object msg, Exception exception) {
+ LOG.error("Could not properly deserialize msg=[{}].", msg, exception);
+ };
+
@Override
public void onParsingHeadFailure(String attribute, Exception exception) {
LOG.warn("Cannot properly parse the head {}", attribute, exception);
@@ -42,6 +47,11 @@ public class IgnoreFailureHandler implements FailureHandler {
LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body,
exception);
}
+ @Override
+ public boolean isIgnoreFailure() {
+ return true;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
index 66eeb19e69..d049920be1 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
@@ -27,6 +27,12 @@ public class NoOpFailureHandler implements FailureHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NoOpFailureHandler.class);
+ @Override
+ public void onParsingMsgFailure(Object msg, Exception t) throws Exception {
+ LOG.error("Could not properly serialize msg=[{}].", msg, t);
+ throw t;
+ }
+
@Override
public void onParsingHeadFailure(String attribute, Exception exception)
throws Exception {
LOG.error("Cannot properly parse the head {}", attribute, exception);
@@ -45,6 +51,11 @@ public class NoOpFailureHandler implements FailureHandler {
throw exception;
}
+ @Override
+ public boolean isIgnoreFailure() {
+ return false;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 9aada97b24..189fda1e0e 100644
---
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -20,13 +20,13 @@ package org.apache.inlong.sort.formats.base;
import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.junit.Assert;
import org.junit.Test;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
/**
* Tests for {@link TableFormatUtils}.
@@ -174,23 +174,23 @@ public class TableFormatUtilsTest {
assertEquals(1, result1);
try {
- deserializeBasicField(
+ Object result2 = deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"",
"n/a");
- fail("The method is expected to throw an exception.");
+ Assert.assertEquals(null, result2);
} catch (Exception e) {
// ignored
}
- Object result2 =
+ Object result3 =
deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"n/a",
"n/a");
- assertNull(result2);
+ assertNull(result3);
}
@Test
diff --git
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
index c7cf93fe9c..9ddc0c4a74 100644
---
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
+++
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
@@ -75,9 +75,11 @@ public class CsvDeserializationSchemaTest {
testBasicDeserialization(config, LongFormatInfo.INSTANCE,
12345678910L, "12345678910");
testBasicDeserialization(config, FloatFormatInfo.INSTANCE,
0.33333334f, "0.33333334");
testBasicDeserialization(config, DoubleFormatInfo.INSTANCE,
0.33333333332, "0.33333333332");
- testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, new
BigDecimal("1234.0000000000000000000000001"),
+ testBasicDeserialization(config, DecimalFormatInfo.INSTANCE,
+ new BigDecimal("1234.0000000000000000000000001"),
"1234.0000000000000000000000001");
- testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
Date.valueOf("2020-03-22"), "22/03/2020");
+ testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
+ Date.valueOf("2020-03-22"), "22/03/2020");
testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"),
Time.valueOf("11:12:13"), "13/12/11");
testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"),
Timestamp.valueOf("2020-03-22 11:12:13"), "22/03/2020
11:12:13");
@@ -101,7 +103,8 @@ public class CsvDeserializationSchemaTest {
testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, null,
nullLiteral);
testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
null, nullLiteral);
testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), null,
nullLiteral);
- testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"), null, nullLiteral);
+ testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy
hh:mm:ss"),
+ null, nullLiteral);
}
@Test
@@ -179,7 +182,7 @@ public class CsvDeserializationSchemaTest {
"1,field1,field2,field3,field4".getBytes());
}
- @Test(expected = Exception.class)
+ @Test
public void testErrors() throws Exception {
Consumer<CsvDeserializationSchema.Builder> config = builder -> {
};
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 7a65ee7e8a..c2c0239518 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -203,7 +203,7 @@ public class InLongMsgCsvFormatDeserializerTest {
List<Row> actualRows = new ArrayList<>();
Collector<Row> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg.buildArray(), collector);
- assertEquals(1, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg2 = InLongMsg.newInLongMsg();
String abNormalAttrs =
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -461,6 +461,16 @@ public class InLongMsgCsvFormatDeserializerTest {
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
+ Row expectedRow1 = Row.of(
+ Timestamp.valueOf("2020-03-22 00:00:00"),
+ expectedAttributes,
+ 1,
+ 2,
+ null,
+ "field11",
+ "field12",
+ "field13");
+
Row expectedRow2 = Row.of(
Timestamp.valueOf("2020-03-22 00:00:00"),
expectedAttributes,
@@ -471,10 +481,10 @@ public class InLongMsgCsvFormatDeserializerTest {
"field22",
"field23");
- testRowDeserialization(
- deserializer,
- inLongMsg1.buildArray(),
- Collections.singletonList(expectedRow2));
+ List list = new ArrayList();
+ list.add(expectedRow1);
+ list.add(expectedRow2);
+ testRowDeserialization(deserializer, inLongMsg1.buildArray(), list);
}
@Test
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index c29258ee42..abfb25a536 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -87,7 +87,7 @@ public class InLongMsgKvFormatDeserializerTest {
List<Row> actualRows = new ArrayList<>();
Collector<Row> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg.buildArray(), collector);
- assertEquals(1, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg InLongMsgHead = InLongMsg.newInLongMsg();
String abNormalAttrs =
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 545ea9d2ae..fae60c03a4 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -86,7 +86,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
List<Row> actualRows = new ArrayList<>();
Collector<Row> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg1.buildArray(), collector);
- assertEquals(2, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index 4ac7d3edb2..72d9604719 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -89,7 +89,7 @@ public class InLongMsgTlogKvFormatDeserializerTest {
List<Row> actualRows = new ArrayList<>();
Collector<Row> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg1.buildArray(), collector);
- assertEquals(2, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
diff --git
a/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
b/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
index 5474e68d2a..144c2aa889 100644
---
a/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
+++
b/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
@@ -169,7 +169,7 @@ public class KvDeserializationSchemaTest {
"f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16));
}
- @Test(expected = Exception.class)
+ @Test
public void testErrors() throws Exception {
Consumer<KvDeserializationSchema.Builder> config = builder -> {
};
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 9857c6ece0..fcd6f468e7 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -286,7 +286,7 @@ public class InLongMsgCsvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg.buildArray(), collector);
- assertEquals(1, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs =
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -545,20 +545,33 @@ public class InLongMsgCsvFormatDeserializerTest {
expectedAttributes.put("__addcol1__", "1");
expectedAttributes.put("__addcol2__", "2");
- GenericRowData expectRowData = new GenericRowData(8);
- expectRowData.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
- expectRowData.setField(1, mapConvert.convert(expectedAttributes));
- expectRowData.setField(2, 1);
- expectRowData.setField(3, 2);
- expectRowData.setField(4, 123);
- expectRowData.setField(5, StringData.fromString("field21"));
- expectRowData.setField(6, StringData.fromString("field22"));
- expectRowData.setField(7, StringData.fromString("field23"));
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, null);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString("field13"));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString("field22"));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+ List expectList = new ArrayList();
+ expectList.add(expectRowData1);
+ expectList.add(expectRowData2);
testRowDeserialization(
deserializer,
inLongMsg.buildArray(),
- Collections.singletonList(expectRowData));
+ expectList);
}
@Test
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index 39f3fd83d7..9025cbc439 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -96,7 +96,7 @@ public class InLongMsgKvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg.buildArray(), collector);
- assertEquals(1, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
String abNormalAttrs =
"m=0&iname=testInterfaceId&__addcol1__=1&__addcol2__=2";
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index ee78cd9ce2..664e8de31c 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -103,7 +103,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg1.buildArray(), collector);
- assertEquals(2, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index d76b63ba41..e797a9774f 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -98,7 +98,7 @@ public class InLongMsgTlogKvFormatDeserializerTest {
List<RowData> actualRowDatas = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRowDatas);
deserializer.flatMap(inLongMsg1.buildArray(), collector);
- assertEquals(2, errorHandler.getRowCount());
+ assertEquals(0, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
index 8a05de9b92..c2d55af15d 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.formats.base;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.NoOpFailureHandler;
import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -39,15 +42,7 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
private static final Logger LOG =
LoggerFactory.getLogger(DefaultDeserializationSchema.class);
- /**
- * If true, the deserialization error will be ignored.
- */
- private final boolean ignoreErrors;
-
- /**
- * If true, a parsing error is occurred.
- */
- private boolean errorOccurred = false;
+ protected FailureHandler failureHandler;
/**
* The format metric group.
@@ -55,7 +50,15 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
protected transient FormatMetricGroup formatMetricGroup;
public DefaultDeserializationSchema(boolean ignoreErrors) {
- this.ignoreErrors = ignoreErrors;
+ if (ignoreErrors) {
+ failureHandler = new IgnoreFailureHandler();
+ } else {
+ failureHandler = new NoOpFailureHandler();
+ }
+ }
+
+ public DefaultDeserializationSchema(FailureHandler failureHandler) {
+ this.failureHandler = failureHandler;
}
@Override
@@ -82,14 +85,12 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
try {
T result = deserializeInternal(bytes);
// reset error state after deserialize success
- errorOccurred = false;
return result;
} catch (Exception e) {
- errorOccurred = true;
if (formatMetricGroup != null) {
formatMetricGroup.getNumRecordsDeserializeError().inc(1L);
}
- if (ignoreErrors) {
+ if (failureHandler != null && failureHandler.isIgnoreFailure()) {
if (formatMetricGroup != null) {
formatMetricGroup.getNumRecordsDeserializeErrorIgnored().inc(1L);
}
@@ -106,11 +107,7 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
}
}
- public boolean skipCurrentRecord(T element) {
- return ignoreErrors && errorOccurred;
- }
-
- protected abstract T deserializeInternal(byte[] bytes) throws IOException;
+ protected abstract T deserializeInternal(byte[] bytes) throws Exception;
@Override
public boolean equals(Object object) {
@@ -121,11 +118,11 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
return false;
}
DefaultDeserializationSchema<?> that =
(DefaultDeserializationSchema<?>) object;
- return Objects.equals(ignoreErrors, that.ignoreErrors);
+ return Objects.equals(failureHandler, that.failureHandler);
}
@Override
public int hashCode() {
- return Objects.hash(ignoreErrors);
+ return Objects.hash(failureHandler);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 71c19a8cc8..39126370a7 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -40,7 +40,6 @@ import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getDataType;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
/**
* Tests for {@link TableFormatForRowDataUtils}.
@@ -188,23 +187,24 @@ public class TableFormatUtilsTest {
assertEquals(1, result1);
try {
- deserializeBasicField(
+ Object result2 = deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"",
"n/a");
- fail("The method is expected to throw an exception.");
+
+ assertNull(result2);
} catch (Exception e) {
// ignored
}
- Object result2 =
+ Object result3 =
deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"n/a",
"n/a");
- assertNull(result2);
+ assertNull(result3);
}
@Test
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index ee36e98126..4541d0d555 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -210,7 +210,7 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
}
@Override
- public RowData deserializeInternal(@Nullable byte[] message) {
+ public RowData deserializeInternal(@Nullable byte[] message) throws
Exception {
if (message == null) {
return null;
}
@@ -241,12 +241,12 @@ public final class CsvRowDataDeserializationSchema
extends DefaultDeserializatio
rowData.setField(i, converters[i].convert(field));
}
}
-
return rowData;
} catch (Throwable t) {
- throw new RuntimeException(
- String.format("Could not properly deserialize csv.
Text=[%s].", text), t);
+ failureHandler.onParsingMsgFailure(text, new RuntimeException(
+ String.format("Could not properly deserialize csv.
Text=[{}].", text), t));
}
+ return null;
}
@Override
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
index 6d0c848e57..d804bc6fa9 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
@@ -275,7 +275,7 @@ public class CsvRowDataDeserializationSchemaTest extends
TestLogger {
false);
}
- @Test(expected = Exception.class)
+ @Test
public void testErrors() throws Exception {
Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
};
@@ -295,9 +295,14 @@ public class CsvRowDataDeserializationSchemaTest extends
TestLogger {
public void testIgnoreErrors() throws Exception {
Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
};
+ GenericRowData rowData = new GenericRowData(4);
+ rowData.setField(0, null);
+ rowData.setField(1, StringData.fromString("field1"));
+ rowData.setField(2, StringData.fromString("field2"));
+ rowData.setField(3, StringData.fromString("field3"));
testRowDataDeserialization(
config,
- null,
+ rowData,
"na,field1,field2,field3".getBytes(),
true);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
index 88394f6560..386f75f998 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.types.logical.RowType;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,13 +103,20 @@ public class JsonRowDataDeserializationSchema extends
DefaultDeserializationSche
}
@Override
- public RowData deserializeInternal(@Nullable byte[] message) throws
IOException {
+ public RowData deserializeInternal(@Nullable byte[] message) throws
Exception {
if (message == null) {
return null;
}
String jsonStr = new String(message, charset);
- return (RowData) runtimeConverter.convert(jsonStr);
+ RowData rowData = null;
+ try {
+ rowData = (RowData) runtimeConverter.convert(jsonStr);
+ } catch (Throwable t) {
+ failureHandler.onParsingMsgFailure(jsonStr, new RuntimeException(
+ String.format("Could not properly deserialize json.
Text=[%s].", jsonStr), t));
+ }
+ return rowData;
}
@Override
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
index 01fdde20f8..c0b2b08c72 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
@@ -36,6 +36,7 @@ import java.sql.Timestamp;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
+import java.util.TimeZone;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.api.DataTypes.ARRAY;
@@ -126,6 +127,7 @@ public class FieldToRowDataConvertersTest {
@Test
public void testConvertToTimestampWithLocalZone() throws IOException {
+ TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0"));
FieldToRowDataConverter converter =
converters.createConverter(TIMESTAMP_WITH_LOCAL_TIME_ZONE().getLogicalType());
TimestampData expected = TimestampData.fromTimestamp(new Timestamp(0));
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
index 9d0bf64cea..2e9e62a889 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
@@ -35,7 +35,6 @@ import java.util.Objects;
import static org.apache.inlong.sort.formats.base.TextFormatOptions.CHARSET;
import static
org.apache.inlong.sort.formats.base.TextFormatOptionsUtil.ISO_8601;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -65,7 +64,6 @@ public class JsonRowDataDeserializationSchemaTest extends
JsonRowDataSerDeTestBa
assertEquals(testRowData, rowData);
assertNotNull(rowData);
- assertFalse(deserializationSchema.skipCurrentRecord(rowData));
}
// @Test
@@ -128,10 +126,8 @@ public class JsonRowDataDeserializationSchemaTest extends
JsonRowDataSerDeTestBa
.build();
RowData rowData = deserializationSchema.deserialize(data);
assertNull(rowData);
- assertTrue(deserializationSchema.skipCurrentRecord(rowData));
rowData =
deserializationSchema.deserialize((testJson.getBytes(CHARSET.defaultValue())));
assertNotNull(rowData);
- assertFalse(deserializationSchema.skipCurrentRecord(rowData));
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
index 24b2875e40..dee4961f2e 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
@@ -35,6 +35,7 @@ import java.sql.Timestamp;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
+import java.util.TimeZone;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.api.DataTypes.ARRAY;
@@ -68,6 +69,7 @@ public abstract class JsonRowDataSerDeTestBase {
@Before
public void init() {
+ TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0"));
byte[] bytes = new byte[10];
ThreadLocalRandom.current().nextBytes(bytes);
String base64Str = Base64.getEncoder().encodeToString(bytes);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
index ad4ce0fc45..63bfd28ed9 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
@@ -35,6 +35,7 @@ import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
+import java.util.TimeZone;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.flink.table.api.DataTypes.ARRAY;
@@ -125,12 +126,11 @@ public class RowDataToFieldConvertersTest {
@Test
public void testConvertTimestampWithLocalZone() {
+ TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0"));
RowDataToFieldConverter converter =
converters.createConverter(TIMESTAMP_WITH_LOCAL_TIME_ZONE().getLogicalType());
TimestampData testTimestampData = TimestampData.fromTimestamp(new
Timestamp(0));
- // assertEquals("1970-01-01 00:00:00Z",
converter.convert(testTimestampData));
- assertTrue("1970-01-01
00:00:00Z".equals(converter.convert(testTimestampData))
- || "1970-01-01
08:00:00Z".equals(converter.convert(testTimestampData)));
+ assertEquals("1970-01-01 00:00:00Z",
converter.convert(testTimestampData));
}
@Test
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index d0bb4f1735..033a51135e 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -145,8 +145,9 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
}
@Override
- public RowData deserializeInternal(byte[] bytes) throws IOException {
+ public RowData deserializeInternal(byte[] bytes) throws Exception {
String text = new String(bytes, Charset.forName(charset));
+ GenericRowData rowData = null;
try {
Map<String, String> fieldTexts =
splitKv(text, entryDelimiter, kvDelimiter, escapeChar,
quoteChar);
@@ -154,7 +155,7 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
- GenericRowData rowData = new
GenericRowData(fieldFormatInfos.length);
+ rowData = new GenericRowData(fieldFormatInfos.length);
for (int i = 0; i < fieldFormatInfos.length; i++) {
String fieldName = fieldNames[i];
FormatInfo fieldFormatInfo = fieldFormatInfos[i];
@@ -170,9 +171,10 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
}
return rowData;
} catch (Throwable t) {
- throw new IOException(
- String.format("Could not properly deserialize kv.
Text=[%s].", text), t);
+ failureHandler.onParsingMsgFailure(text, new RuntimeException(
+ String.format("Could not properly deserialize kv.
Text=[{}].", text), t));
}
+ return null;
}
@Override
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
index a0350905e3..bcc0dc3911 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
@@ -231,7 +231,7 @@ public class KvRowDataDeserializationSchemaTest {
"f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16));
}
- @Test(expected = Exception.class)
+ @Test
public void testErrors() throws Exception {
Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> {
};