This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 23ab3edbbb [Hotfix][CDC] Fix ddl duplicate execution error when config 
multi_table_sink_replica (#7634)
23ab3edbbb is described below

commit 23ab3edbbb4a0fd3a25e3317ccf5a60ed4428aa9
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 23 09:27:23 2024 +0800

    [Hotfix][CDC] Fix ddl duplicate execution error when config 
multi_table_sink_replica (#7634)
---
 .../sink/multitablesink/MultiTableSinkWriter.java  |   8 +
 .../mysql/source/MySqlSchemaChangeResolver.java    |   1 +
 .../iceberg/sink/writer/IcebergRecordWriter.java   |   2 +
 .../seatunnel/iceberg/utils/SchemaUtils.java       |  14 +-
 .../jdbc/internal/dialect/JdbcDialect.java         | 192 +++++++++++++++------
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |  27 +--
 .../dialect/oceanbase/OceanBaseMysqlDialect.java   |  27 +--
 .../jdbc/sink/AbstractJdbcSinkWriter.java          |  32 ++--
 .../mysqlcdc_to_mysql_with_schema_change.conf      |   3 +
 .../mysql_cdc_to_iceberg_for_schema_change.conf    |   3 +
 10 files changed, 198 insertions(+), 111 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 649417351a..5a2f51fd16 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -144,9 +144,17 @@ public class MultiTableSinkWriter
                         .getKey()
                         .getTableIdentifier()
                         .equals(event.tablePath().getFullName())) {
+                    log.info(
+                            "Start apply schema change for table {} sub-writer 
{}",
+                            sinkWriterEntry.getKey().getTableIdentifier(),
+                            sinkWriterEntry.getKey().getIndex());
                     synchronized (runnable.get(i)) {
                         sinkWriterEntry.getValue().applySchemaChange(event);
                     }
+                    log.info(
+                            "Finish apply schema change for table {} 
sub-writer {}",
+                            sinkWriterEntry.getKey().getTableIdentifier(),
+                            sinkWriterEntry.getKey().getIndex());
                 }
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
index bd386e9bb5..3ea4a0dfce 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
@@ -64,6 +64,7 @@ public class MySqlSchemaChangeResolver extends 
AbstractSchemaChangeResolver {
         customMySqlAntlrDdlParser.parse(ddl, tables);
         List<AlterTableColumnEvent> parsedEvents =
                 customMySqlAntlrDdlParser.getAndClearParsedEvents();
+        parsedEvents.forEach(e -> 
e.setSourceDialectName(DatabaseIdentifier.MYSQL));
         AlterTableColumnsEvent alterTableColumnsEvent =
                 new AlterTableColumnsEvent(
                         TableIdentifier.of(
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 06b48591df..ce83a8641e 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -91,6 +91,8 @@ public class IcebergRecordWriter implements RecordWriter {
     public void applySchemaChange(SeaTunnelRowType afterRowType, 
SchemaChangeEvent event) {
         log.info("Apply schema change start.");
         SchemaChangeWrapper updates = new SchemaChangeWrapper();
+        // get the latest schema in case another process updated it
+        table.refresh();
         Schema schema = table.schema();
         if (event instanceof AlterTableDropColumnEvent) {
             AlterTableDropColumnEvent dropColumnEvent = 
(AlterTableDropColumnEvent) event;
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 5047746e9e..9aba4a777d 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -207,14 +207,17 @@ public class SchemaUtils {
                         .filter(updateType -> !typeMatches(table.schema(), 
updateType))
                         .collect(toList());
 
-        // filter out columns that have the updated type
+        // filter out columns that have already been deleted
         List<SchemaDeleteColumn> deleteColumns =
                 wrapper.deleteColumns().stream()
                         .filter(deleteColumn -> findColumns(table.schema(), 
deleteColumn))
                         .collect(toList());
 
-        // Rename column name
-        List<SchemaChangeColumn> changeColumns = new 
ArrayList<>(wrapper.changeColumns());
+        // filter out columns that have already been changed
+        List<SchemaChangeColumn> changeColumns =
+                wrapper.changeColumns().stream()
+                        .filter(changeColumn -> findColumns(table.schema(), 
changeColumn))
+                        .collect(toList());
 
         if (addColumns.isEmpty()
                 && modifyColumns.isEmpty()
@@ -255,6 +258,11 @@ public class SchemaUtils {
         return schema.findField(deleteColumn.name()) != null;
     }
 
+    private static boolean findColumns(
+            org.apache.iceberg.Schema schema, SchemaChangeColumn changeColumn) 
{
+        return schema.findField(changeColumn.oldName()) != null;
+    }
+
     public static SeaTunnelDataType<?> toSeaTunnelType(String fieldName, Type 
type) {
         return IcebergTypeMapper.mapping(fieldName, type);
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index f98f2cb312..2fc0fe8dca 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -25,8 +25,10 @@ import 
org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent;
 import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent;
 import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
 import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent;
 import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
@@ -46,6 +48,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -438,72 +441,165 @@ public interface JdbcDialect extends Serializable {
     /**
      * Refresh physical table schema by schema change event
      *
-     * @param sourceDialectName source dialect name
      * @param event schema change event
-     * @param jdbcConnectionProvider jdbc connection provider
-     * @param sinkTablePath sink table path
+     * @param connection jdbc connection
+     * @param tablePath sink table path
      */
-    default void refreshTableSchemaBySchemaChangeEvent(
-            String sourceDialectName,
-            AlterTableColumnEvent event,
-            JdbcConnectionProvider jdbcConnectionProvider,
-            TablePath sinkTablePath) {}
+    default void applySchemaChange(
+            SchemaChangeEvent event, Connection connection, TablePath 
tablePath)
+            throws SQLException {
+        if (event instanceof AlterTableColumnsEvent) {
+            for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent) 
event).getEvents()) {
+                applySchemaChange(columnEvent, connection, tablePath);
+            }
+        } else {
+            if (event instanceof AlterTableChangeColumnEvent) {
+                AlterTableChangeColumnEvent changeColumnEvent = 
(AlterTableChangeColumnEvent) event;
+                if (!changeColumnEvent
+                        .getOldColumn()
+                        .equals(changeColumnEvent.getColumn().getName())) {
+                    if (!columnExists(connection, tablePath, 
changeColumnEvent.getOldColumn())
+                            && columnExists(
+                                    connection,
+                                    tablePath,
+                                    changeColumnEvent.getColumn().getName())) {
+                        log.warn(
+                                "Column {} already exists in table {}. 
Skipping change column operation. event: {}",
+                                changeColumnEvent.getColumn().getName(),
+                                tablePath.getFullName(),
+                                event);
+                        return;
+                    }
+                }
+                applySchemaChange(connection, tablePath, changeColumnEvent);
+            } else if (event instanceof AlterTableModifyColumnEvent) {
+                applySchemaChange(connection, tablePath, 
(AlterTableModifyColumnEvent) event);
+            } else if (event instanceof AlterTableAddColumnEvent) {
+                AlterTableAddColumnEvent addColumnEvent = 
(AlterTableAddColumnEvent) event;
+                if (columnExists(connection, tablePath, 
addColumnEvent.getColumn().getName())) {
+                    log.warn(
+                            "Column {} already exists in table {}. Skipping 
add column operation. event: {}",
+                            addColumnEvent.getColumn().getName(),
+                            tablePath.getFullName(),
+                            event);
+                    return;
+                }
+                applySchemaChange(connection, tablePath, addColumnEvent);
+            } else if (event instanceof AlterTableDropColumnEvent) {
+                AlterTableDropColumnEvent dropColumnEvent = 
(AlterTableDropColumnEvent) event;
+                if (!columnExists(connection, tablePath, 
dropColumnEvent.getColumn())) {
+                    log.warn(
+                            "Column {} does not exist in table {}. Skipping 
drop column operation. event: {}",
+                            dropColumnEvent.getColumn(),
+                            tablePath.getFullName(),
+                            event);
+                    return;
+                }
+                applySchemaChange(connection, tablePath, dropColumnEvent);
+            } else {
+                throw new SeaTunnelException(
+                        "Unsupported schemaChangeEvent : " + 
event.getEventType());
+            }
+        }
+    }
 
     /**
-     * generate alter table sql
+     * Check if the column exists in the table
      *
-     * @param sourceDialectName source dialect name
-     * @param event schema change event
-     * @param sinkTablePath sink table path
-     * @return alter table sql for sink table
+     * @param connection
+     * @param tablePath
+     * @param column
+     * @return
      */
-    default String generateAlterTableSql(
-            String sourceDialectName, AlterTableColumnEvent event, TablePath 
sinkTablePath) {
-        String tableIdentifierWithQuoted =
-                tableIdentifier(sinkTablePath.getDatabaseName(), 
sinkTablePath.getTableName());
-        switch (event.getEventType()) {
-            case SCHEMA_CHANGE_ADD_COLUMN:
-                Column addColumn = ((AlterTableAddColumnEvent) 
event).getColumn();
-                return buildAlterTableSql(
-                        sourceDialectName,
+    default boolean columnExists(Connection connection, TablePath tablePath, 
String column) {
+        String selectColumnSQL =
+                String.format(
+                        "SELECT %s FROM %s WHERE 1 != 1",
+                        quoteIdentifier(column), tableIdentifier(tablePath));
+        try (Statement statement = connection.createStatement()) {
+            return statement.execute(selectColumnSQL);
+        } catch (SQLException e) {
+            log.debug("Column {} does not exist in table {}", column, 
tablePath.getFullName(), e);
+            return false;
+        }
+    }
+
+    default void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableAddColumnEvent event)
+            throws SQLException {
+        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+        Column addColumn = event.getColumn();
+        String addColumnSQL =
+                buildAlterTableSql(
+                        event.getSourceDialectName(),
                         addColumn.getSourceType(),
                         AlterType.ADD.name(),
                         addColumn,
                         tableIdentifierWithQuoted,
                         StringUtils.EMPTY);
-            case SCHEMA_CHANGE_DROP_COLUMN:
-                String dropColumn = ((AlterTableDropColumnEvent) 
event).getColumn();
-                return buildAlterTableSql(
-                        sourceDialectName,
-                        null,
-                        AlterType.DROP.name(),
-                        null,
+        try (Statement statement = connection.createStatement()) {
+            log.info("Executing add column SQL: " + addColumnSQL);
+            statement.execute(addColumnSQL);
+        }
+    }
+
+    default void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableChangeColumnEvent event)
+            throws SQLException {
+        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+        Column changeColumn = event.getColumn();
+        String oldColumnName = event.getOldColumn();
+        String changeColumnSQL =
+                buildAlterTableSql(
+                        event.getSourceDialectName(),
+                        changeColumn.getSourceType(),
+                        AlterType.CHANGE.name(),
+                        changeColumn,
                         tableIdentifierWithQuoted,
-                        dropColumn);
-            case SCHEMA_CHANGE_MODIFY_COLUMN:
-                Column modifyColumn = ((AlterTableModifyColumnEvent) 
event).getColumn();
-                return buildAlterTableSql(
-                        sourceDialectName,
+                        oldColumnName);
+
+        try (Statement statement = connection.createStatement()) {
+            log.info("Executing change column SQL: " + changeColumnSQL);
+            statement.execute(changeColumnSQL);
+        }
+    }
+
+    default void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableModifyColumnEvent event)
+            throws SQLException {
+        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+        Column modifyColumn = event.getColumn();
+        String modifyColumnSQL =
+                buildAlterTableSql(
+                        event.getSourceDialectName(),
                         modifyColumn.getSourceType(),
                         AlterType.MODIFY.name(),
                         modifyColumn,
                         tableIdentifierWithQuoted,
                         StringUtils.EMPTY);
-            case SCHEMA_CHANGE_CHANGE_COLUMN:
-                AlterTableChangeColumnEvent alterTableChangeColumnEvent =
-                        (AlterTableChangeColumnEvent) event;
-                Column changeColumn = alterTableChangeColumnEvent.getColumn();
-                String oldColumnName = 
alterTableChangeColumnEvent.getOldColumn();
-                return buildAlterTableSql(
-                        sourceDialectName,
-                        changeColumn.getSourceType(),
-                        AlterType.CHANGE.name(),
-                        changeColumn,
+
+        try (Statement statement = connection.createStatement()) {
+            log.info("Executing modify column SQL: " + modifyColumnSQL);
+            statement.execute(modifyColumnSQL);
+        }
+    }
+
+    default void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableDropColumnEvent event)
+            throws SQLException {
+        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+        String dropColumn = event.getColumn();
+        String dropColumnSQL =
+                buildAlterTableSql(
+                        event.getSourceDialectName(),
+                        null,
+                        AlterType.DROP.name(),
+                        null,
                         tableIdentifierWithQuoted,
-                        oldColumnName);
-            default:
-                throw new SeaTunnelException(
-                        "Unsupported schemaChangeEvent for event type: " + 
event.getEventType());
+                        dropColumn);
+        try (Statement statement = connection.createStatement()) {
+            log.info("Executing drop column SQL: " + dropColumnSQL);
+            statement.execute(dropColumnSQL);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index fd0af3d9ff..22431b0d96 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -19,10 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
-import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -89,6 +85,11 @@ public class MysqlDialect implements JdbcDialect {
         return "`" + identifier + "`";
     }
 
+    @Override
+    public String tableIdentifier(TablePath tablePath) {
+        return tableIdentifier(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
     @Override
     public Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
@@ -226,24 +227,6 @@ public class MysqlDialect implements JdbcDialect {
         return SQLUtils.countForSubquery(connection, table.getQuery());
     }
 
-    @Override
-    public void refreshTableSchemaBySchemaChangeEvent(
-            String sourceDialectName,
-            AlterTableColumnEvent event,
-            JdbcConnectionProvider refreshTableSchemaConnectionProvider,
-            TablePath sinkTablePath) {
-        try (Connection connection =
-                        
refreshTableSchemaConnectionProvider.getOrEstablishConnection();
-                Statement stmt = connection.createStatement()) {
-            String alterTableSql = generateAlterTableSql(sourceDialectName, 
event, sinkTablePath);
-            log.info("Apply schema change with sql: {}", alterTableSql);
-            stmt.execute(alterTableSql);
-        } catch (Exception e) {
-            throw new JdbcConnectorException(
-                    
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
-        }
-    }
-
     @Override
     public String decorateWithComment(String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
         MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
index 83d3220b12..1c5d7734fb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -19,10 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbas
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
-import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -92,6 +88,11 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
         return "`" + identifier + "`";
     }
 
+    @Override
+    public String tableIdentifier(TablePath tablePath) {
+        return tableIdentifier(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
     @Override
     public Optional<String> getUpsertStatement(
             String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
@@ -229,24 +230,6 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
         return SQLUtils.countForSubquery(connection, table.getQuery());
     }
 
-    @Override
-    public void refreshTableSchemaBySchemaChangeEvent(
-            String sourceDialectName,
-            AlterTableColumnEvent event,
-            JdbcConnectionProvider refreshTableSchemaConnectionProvider,
-            TablePath sinkTablePath) {
-        try (Connection connection =
-                        
refreshTableSchemaConnectionProvider.getOrEstablishConnection();
-                Statement stmt = connection.createStatement()) {
-            String alterTableSql = generateAlterTableSql(sourceDialectName, 
event, sinkTablePath);
-            log.info("Apply schema change with sql: {}", alterTableSql);
-            stmt.execute(alterTableSql);
-        } catch (Exception e) {
-            throw new JdbcConnectorException(
-                    
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
-        }
-    }
-
     @Override
     public String decorateWithComment(String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
         OceanBaseMysqlType nativeType = (OceanBaseMysqlType) 
typeBasicTypeDefine.getNativeType();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index ca7c457b7d..f894999a42 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -47,6 +47,7 @@ import org.apache.commons.lang3.StringUtils;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.util.List;
 
 @Slf4j
@@ -66,22 +67,22 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
     public void applySchemaChange(SchemaChangeEvent event) throws IOException {
         if (event instanceof AlterTableColumnsEvent) {
             AlterTableColumnsEvent alterTableColumnsEvent = 
(AlterTableColumnsEvent) event;
-            String sourceDialectName = 
alterTableColumnsEvent.getSourceDialectName();
-            if (StringUtils.isBlank(sourceDialectName)) {
-                throw new SeaTunnelException(
-                        "The sourceDialectName in AlterTableColumnEvent can 
not be empty");
-            }
             List<AlterTableColumnEvent> events = 
alterTableColumnsEvent.getEvents();
             for (AlterTableColumnEvent alterTableColumnEvent : events) {
-                processSchemaChangeEvent(alterTableColumnEvent, 
sourceDialectName);
+                String sourceDialectName = 
alterTableColumnEvent.getSourceDialectName();
+                if (StringUtils.isBlank(sourceDialectName)) {
+                    throw new SeaTunnelException(
+                            "The sourceDialectName in AlterTableColumnEvent 
can not be empty. event: "
+                                    + event);
+                }
+                processSchemaChangeEvent(alterTableColumnEvent);
             }
         } else {
             log.warn("We only support AlterTableColumnsEvent, but actual event 
is " + event);
         }
     }
 
-    protected void processSchemaChangeEvent(AlterTableColumnEvent event, 
String sourceDialectName)
-            throws IOException {
+    protected void processSchemaChangeEvent(AlterTableColumnEvent event) 
throws IOException {
         TableSchema newTableSchema = this.tableSchema.copy();
         List<Column> columns = newTableSchema.getColumns();
         switch (event.getEventType()) {
@@ -109,17 +110,16 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
                         "Unsupported schemaChangeEvent for event type: " + 
event.getEventType());
         }
         this.tableSchema = newTableSchema;
-        reOpenOutputFormat(event, sourceDialectName);
+        reOpenOutputFormat(event);
     }
 
-    protected void reOpenOutputFormat(AlterTableColumnEvent event, String 
sourceDialectName)
-            throws IOException {
+    protected void reOpenOutputFormat(AlterTableColumnEvent event) throws 
IOException {
         this.prepareCommit();
-        try {
-            JdbcConnectionProvider refreshTableSchemaConnectionProvider =
-                    
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
-            dialect.refreshTableSchemaBySchemaChangeEvent(
-                    sourceDialectName, event, 
refreshTableSchemaConnectionProvider, sinkTablePath);
+        JdbcConnectionProvider refreshTableSchemaConnectionProvider =
+                
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
+        try (Connection connection =
+                
refreshTableSchemaConnectionProvider.getOrEstablishConnection()) {
+            dialect.applySchemaChange(event, connection, sinkTablePath);
         } catch (Throwable e) {
             throw new JdbcConnectorException(
                     
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
index 0594a42d95..632b643bb2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
@@ -50,5 +50,8 @@ sink {
     database = shop
     table = mysql_cdc_e2e_sink_table_with_schema_change
     primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
   }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
index dd688a5ef5..8433f155e9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
@@ -64,5 +64,8 @@ sink {
     iceberg.table.upsert-mode-enabled=true
     iceberg.table.schema-evolution-enabled=true
     case_sensitive=true
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
   }
 }

Reply via email to