This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2c51a6282 [cdc] Do not throw exception when cdc table change is null
(#1702)
2c51a6282 is described below
commit 2c51a6282ec4b7ccff97123a0f0154ac0ff283d7
Author: JunZhang <[email protected]>
AuthorDate: Tue Aug 8 13:36:31 2023 +0800
[cdc] Do not throw exception when cdc table change is null (#1702)
---
.../flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 4aac385b2..418db64d2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -187,9 +187,10 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
String historyRecordString = historyRecord.asText();
JsonNode tableChanges =
objectMapper.readTree(historyRecordString).get("tableChanges");
if (tableChanges.size() != 1) {
- throw new IllegalArgumentException(
+ LOG.error(
"Invalid historyRecord, because tableChanges should
contain exactly 1 item.\n"
- + historyRecordString);
+ + historyRecord.asText());
+ return Collections.emptyList();
}
columns = tableChanges.get(0).get("table").get("columns");
} catch (Exception e) {
@@ -238,9 +239,10 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
String historyRecordString = historyRecord.asText();
JsonNode tableChanges =
objectMapper.readTree(historyRecordString).get("tableChanges");
if (tableChanges.size() != 1) {
- throw new IllegalArgumentException(
+ LOG.error(
"Invalid historyRecord, because tableChanges should
contain exactly 1 item.\n"
+ historyRecord.asText());
+ return Optional.empty();
}
JsonNode tableChange = tableChanges.get(0);