libenchao commented on code in PR #25919:
URL: https://github.com/apache/flink/pull/25919#discussion_r1928378346
##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -214,6 +215,7 @@ public void deserialize(@Nullable byte[] message,
Collector<RowData> out) throws
if (message == null || message.length == 0) {
return;
}
+ List<GenericRowData> genericRowDataList = new ArrayList<>();
Review Comment:
We can add a transient field to `CacalJsonDeserializationSchema` to avoid
creating new list for every record, it's more efficient, WDYT?
##########
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -73,10 +76,69 @@ void testFilteringTables() throws Exception {
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
.setDatabase("^my.*")
.setTable("^prod.*")
+ .setIgnoreParseErrors(true)
Review Comment:
What's the purpose of enabling ignore-parse-error for this test case?
##########
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -319,4 +381,22 @@ public void close() {
// do nothing
}
}
+
+ private static class SpecialCollector implements Collector<RowData> {
Review Comment:
How about naming it `ThrowingExceptionCollector`?
##########
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -319,4 +381,22 @@ public void close() {
// do nothing
}
}
+
+ private static class SpecialCollector implements Collector<RowData> {
+
+ private final List<RowData> list = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ if (record.getRowKind().equals(RowKind.DELETE) && record.getInt(0)
== 103) {
Review Comment:
The criteria is too specific, could we just throw exception directly no
matter what the data is? We can only test the case that when exception occurs
in chained downstream operator, the exception is correctly thrown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]