This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new e0e09313b [FLINK-37217][mysql] Fix `MySqlErrorHandler`
TableNotFoundException Unable to obtain table correctly (#3892)
e0e09313b is described below
commit e0e09313b3afa81b1bfaeb47a8a1ace1365cb330
Author: big face cat <[email protected]>
AuthorDate: Sat Oct 11 10:32:07 2025 +0800
[FLINK-37217][mysql] Fix `MySqlErrorHandler` TableNotFoundException Unable
to obtain table correctly (#3892)
Co-authored-by: huyuanfeng <[email protected]>
---
.../debezium/task/context/MySqlErrorHandler.java | 22 +++++++++++++---------
1 file changed, 13 insertions(+), 9 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
index a0133b548..47e3e6033 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java
@@ -32,6 +32,7 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -60,12 +61,9 @@ public class MySqlErrorHandler extends ErrorHandler {
@Override
public void setProducerThrowable(Throwable producerThrowable) {
- if (isTableNotFoundException(producerThrowable)) {
- Matcher matcher =
-
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
- String databaseName = matcher.group(1);
- String tableName = matcher.group(2);
- TableId tableId = new TableId(databaseName, null, tableName);
+ Optional<TableId> notFoundTable =
extractNotFoundTableId(producerThrowable);
+ if (notFoundTable.isPresent()) {
+ TableId tableId = notFoundTable.get();
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
@@ -86,14 +84,20 @@ public class MySqlErrorHandler extends ErrorHandler {
super.setProducerThrowable(producerThrowable);
}
- private boolean isTableNotFoundException(Throwable t) {
+ private Optional<TableId> extractNotFoundTableId(Throwable t) {
if (!(t.getCause() instanceof DebeziumException)) {
- return false;
+ return Optional.empty();
}
DebeziumException e = (DebeziumException) t.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
- return matcher.find();
+ if (matcher.find()) {
+ String databaseName = matcher.group(1);
+ String tableName = matcher.group(2);
+ return Optional.of(new TableId(databaseName, null, tableName));
+ } else {
+ return Optional.empty();
+ }
}
private boolean isSchemaOutOfSyncException(Throwable t) {