This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c51a6282 [cdc] Do not throw exception when cdc table change is null 
(#1702)
2c51a6282 is described below

commit 2c51a6282ec4b7ccff97123a0f0154ac0ff283d7
Author: JunZhang <[email protected]>
AuthorDate: Tue Aug 8 13:36:31 2023 +0800

    [cdc] Do not throw exception when cdc table change is null (#1702)
---
 .../flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java      | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 4aac385b2..418db64d2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -187,9 +187,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             String historyRecordString = historyRecord.asText();
             JsonNode tableChanges = 
objectMapper.readTree(historyRecordString).get("tableChanges");
             if (tableChanges.size() != 1) {
-                throw new IllegalArgumentException(
+                LOG.error(
                         "Invalid historyRecord, because tableChanges should 
contain exactly 1 item.\n"
-                                + historyRecordString);
+                                + historyRecord.asText());
+                return Collections.emptyList();
             }
             columns = tableChanges.get(0).get("table").get("columns");
         } catch (Exception e) {
@@ -238,9 +239,10 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             String historyRecordString = historyRecord.asText();
             JsonNode tableChanges = 
objectMapper.readTree(historyRecordString).get("tableChanges");
             if (tableChanges.size() != 1) {
-                throw new IllegalArgumentException(
+                LOG.error(
                         "Invalid historyRecord, because tableChanges should 
contain exactly 1 item.\n"
                                 + historyRecord.asText());
+                return Optional.empty();
             }
 
             JsonNode tableChange = tableChanges.get(0);

Reply via email to