This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new e0e09313b [FLINK-37217][mysql] Fix `MySqlErrorHandler` 
TableNotFoundException Unable to obtain table correctly (#3892)
e0e09313b is described below

commit e0e09313b3afa81b1bfaeb47a8a1ace1365cb330
Author: big face cat <[email protected]>
AuthorDate: Sat Oct 11 10:32:07 2025 +0800

    [FLINK-37217][mysql] Fix `MySqlErrorHandler` TableNotFoundException Unable 
to obtain table correctly (#3892)
    
    Co-authored-by: huyuanfeng <[email protected]>
---
 .../debezium/task/context/MySqlErrorHandler.java   | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
index a0133b548..47e3e6033 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
@@ -32,6 +32,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -60,12 +61,9 @@ public class MySqlErrorHandler extends ErrorHandler {
 
     @Override
     public void setProducerThrowable(Throwable producerThrowable) {
-        if (isTableNotFoundException(producerThrowable)) {
-            Matcher matcher =
-                    
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
-            String databaseName = matcher.group(1);
-            String tableName = matcher.group(2);
-            TableId tableId = new TableId(databaseName, null, tableName);
+        Optional<TableId> notFoundTable = 
extractNotFoundTableId(producerThrowable);
+        if (notFoundTable.isPresent()) {
+            TableId tableId = notFoundTable.get();
             if (context.getSchema().schemaFor(tableId) == null) {
                 LOG.warn("Schema for table " + tableId + " is null");
                 return;
@@ -86,14 +84,20 @@ public class MySqlErrorHandler extends ErrorHandler {
         super.setProducerThrowable(producerThrowable);
     }
 
-    private boolean isTableNotFoundException(Throwable t) {
+    private Optional<TableId> extractNotFoundTableId(Throwable t) {
         if (!(t.getCause() instanceof DebeziumException)) {
-            return false;
+            return Optional.empty();
         }
         DebeziumException e = (DebeziumException) t.getCause();
         String detailMessage = e.getMessage();
         Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
-        return matcher.find();
+        if (matcher.find()) {
+            String databaseName = matcher.group(1);
+            String tableName = matcher.group(2);
+            return Optional.of(new TableId(databaseName, null, tableName));
+        } else {
+            return Optional.empty();
+        }
     }
 
     private boolean isSchemaOutOfSyncException(Throwable t) {

Reply via email to