This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 23ab3edbbb [Hotfix][CDC] Fix ddl duplicate execution error when config
multi_table_sink_replica (#7634)
23ab3edbbb is described below
commit 23ab3edbbb4a0fd3a25e3317ccf5a60ed4428aa9
Author: hailin0 <[email protected]>
AuthorDate: Mon Sep 23 09:27:23 2024 +0800
[Hotfix][CDC] Fix ddl duplicate execution error when config
multi_table_sink_replica (#7634)
---
.../sink/multitablesink/MultiTableSinkWriter.java | 8 +
.../mysql/source/MySqlSchemaChangeResolver.java | 1 +
.../iceberg/sink/writer/IcebergRecordWriter.java | 2 +
.../seatunnel/iceberg/utils/SchemaUtils.java | 14 +-
.../jdbc/internal/dialect/JdbcDialect.java | 192 +++++++++++++++------
.../jdbc/internal/dialect/mysql/MysqlDialect.java | 27 +--
.../dialect/oceanbase/OceanBaseMysqlDialect.java | 27 +--
.../jdbc/sink/AbstractJdbcSinkWriter.java | 32 ++--
.../mysqlcdc_to_mysql_with_schema_change.conf | 3 +
.../mysql_cdc_to_iceberg_for_schema_change.conf | 3 +
10 files changed, 198 insertions(+), 111 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
index 649417351a..5a2f51fd16 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java
@@ -144,9 +144,17 @@ public class MultiTableSinkWriter
.getKey()
.getTableIdentifier()
.equals(event.tablePath().getFullName())) {
+ log.info(
+ "Start apply schema change for table {} sub-writer
{}",
+ sinkWriterEntry.getKey().getTableIdentifier(),
+ sinkWriterEntry.getKey().getIndex());
synchronized (runnable.get(i)) {
sinkWriterEntry.getValue().applySchemaChange(event);
}
+ log.info(
+ "Finish apply schema change for table {}
sub-writer {}",
+ sinkWriterEntry.getKey().getTableIdentifier(),
+ sinkWriterEntry.getKey().getIndex());
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
index bd386e9bb5..3ea4a0dfce 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlSchemaChangeResolver.java
@@ -64,6 +64,7 @@ public class MySqlSchemaChangeResolver extends
AbstractSchemaChangeResolver {
customMySqlAntlrDdlParser.parse(ddl, tables);
List<AlterTableColumnEvent> parsedEvents =
customMySqlAntlrDdlParser.getAndClearParsedEvents();
+ parsedEvents.forEach(e ->
e.setSourceDialectName(DatabaseIdentifier.MYSQL));
AlterTableColumnsEvent alterTableColumnsEvent =
new AlterTableColumnsEvent(
TableIdentifier.of(
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
index 06b48591df..ce83a8641e 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergRecordWriter.java
@@ -91,6 +91,8 @@ public class IcebergRecordWriter implements RecordWriter {
public void applySchemaChange(SeaTunnelRowType afterRowType,
SchemaChangeEvent event) {
log.info("Apply schema change start.");
SchemaChangeWrapper updates = new SchemaChangeWrapper();
+ // get the latest schema in case another process updated it
+ table.refresh();
Schema schema = table.schema();
if (event instanceof AlterTableDropColumnEvent) {
AlterTableDropColumnEvent dropColumnEvent =
(AlterTableDropColumnEvent) event;
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 5047746e9e..9aba4a777d 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -207,14 +207,17 @@ public class SchemaUtils {
.filter(updateType -> !typeMatches(table.schema(),
updateType))
.collect(toList());
- // filter out columns that have the updated type
+ // filter out columns that have already been deleted
List<SchemaDeleteColumn> deleteColumns =
wrapper.deleteColumns().stream()
.filter(deleteColumn -> findColumns(table.schema(),
deleteColumn))
.collect(toList());
- // Rename column name
- List<SchemaChangeColumn> changeColumns = new
ArrayList<>(wrapper.changeColumns());
+ // filter out columns that have already been changed
+ List<SchemaChangeColumn> changeColumns =
+ wrapper.changeColumns().stream()
+ .filter(changeColumn -> findColumns(table.schema(),
changeColumn))
+ .collect(toList());
if (addColumns.isEmpty()
&& modifyColumns.isEmpty()
@@ -255,6 +258,11 @@ public class SchemaUtils {
return schema.findField(deleteColumn.name()) != null;
}
+ private static boolean findColumns(
+ org.apache.iceberg.Schema schema, SchemaChangeColumn changeColumn)
{
+ return schema.findField(changeColumn.oldName()) != null;
+ }
+
public static SeaTunnelDataType<?> toSeaTunnelType(String fieldName, Type
type) {
return IcebergTypeMapper.mapping(fieldName, type);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index f98f2cb312..2fc0fe8dca 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -25,8 +25,10 @@ import
org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.api.table.event.AlterTableAddColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableChangeColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.event.AlterTableColumnsEvent;
import org.apache.seatunnel.api.table.event.AlterTableDropColumnEvent;
import org.apache.seatunnel.api.table.event.AlterTableModifyColumnEvent;
+import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
@@ -46,6 +48,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -438,72 +441,165 @@ public interface JdbcDialect extends Serializable {
/**
* Refresh physical table schema by schema change event
*
- * @param sourceDialectName source dialect name
* @param event schema change event
- * @param jdbcConnectionProvider jdbc connection provider
- * @param sinkTablePath sink table path
+ * @param connection jdbc connection
+ * @param tablePath sink table path
*/
- default void refreshTableSchemaBySchemaChangeEvent(
- String sourceDialectName,
- AlterTableColumnEvent event,
- JdbcConnectionProvider jdbcConnectionProvider,
- TablePath sinkTablePath) {}
+ default void applySchemaChange(
+ SchemaChangeEvent event, Connection connection, TablePath
tablePath)
+ throws SQLException {
+ if (event instanceof AlterTableColumnsEvent) {
+ for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent)
event).getEvents()) {
+ applySchemaChange(columnEvent, connection, tablePath);
+ }
+ } else {
+ if (event instanceof AlterTableChangeColumnEvent) {
+ AlterTableChangeColumnEvent changeColumnEvent =
(AlterTableChangeColumnEvent) event;
+ if (!changeColumnEvent
+ .getOldColumn()
+ .equals(changeColumnEvent.getColumn().getName())) {
+ if (!columnExists(connection, tablePath,
changeColumnEvent.getOldColumn())
+ && columnExists(
+ connection,
+ tablePath,
+ changeColumnEvent.getColumn().getName())) {
+ log.warn(
+ "Column {} already exists in table {}.
Skipping change column operation. event: {}",
+ changeColumnEvent.getColumn().getName(),
+ tablePath.getFullName(),
+ event);
+ return;
+ }
+ }
+ applySchemaChange(connection, tablePath, changeColumnEvent);
+ } else if (event instanceof AlterTableModifyColumnEvent) {
+ applySchemaChange(connection, tablePath,
(AlterTableModifyColumnEvent) event);
+ } else if (event instanceof AlterTableAddColumnEvent) {
+ AlterTableAddColumnEvent addColumnEvent =
(AlterTableAddColumnEvent) event;
+ if (columnExists(connection, tablePath,
addColumnEvent.getColumn().getName())) {
+ log.warn(
+ "Column {} already exists in table {}. Skipping
add column operation. event: {}",
+ addColumnEvent.getColumn().getName(),
+ tablePath.getFullName(),
+ event);
+ return;
+ }
+ applySchemaChange(connection, tablePath, addColumnEvent);
+ } else if (event instanceof AlterTableDropColumnEvent) {
+ AlterTableDropColumnEvent dropColumnEvent =
(AlterTableDropColumnEvent) event;
+ if (!columnExists(connection, tablePath,
dropColumnEvent.getColumn())) {
+ log.warn(
+ "Column {} does not exist in table {}. Skipping
drop column operation. event: {}",
+ dropColumnEvent.getColumn(),
+ tablePath.getFullName(),
+ event);
+ return;
+ }
+ applySchemaChange(connection, tablePath, dropColumnEvent);
+ } else {
+ throw new SeaTunnelException(
+ "Unsupported schemaChangeEvent : " +
event.getEventType());
+ }
+ }
+ }
/**
- * generate alter table sql
+ * Check if the column exists in the table
*
- * @param sourceDialectName source dialect name
- * @param event schema change event
- * @param sinkTablePath sink table path
- * @return alter table sql for sink table
+ * @param connection
+ * @param tablePath
+ * @param column
+ * @return
*/
- default String generateAlterTableSql(
- String sourceDialectName, AlterTableColumnEvent event, TablePath
sinkTablePath) {
- String tableIdentifierWithQuoted =
- tableIdentifier(sinkTablePath.getDatabaseName(),
sinkTablePath.getTableName());
- switch (event.getEventType()) {
- case SCHEMA_CHANGE_ADD_COLUMN:
- Column addColumn = ((AlterTableAddColumnEvent)
event).getColumn();
- return buildAlterTableSql(
- sourceDialectName,
+ default boolean columnExists(Connection connection, TablePath tablePath,
String column) {
+ String selectColumnSQL =
+ String.format(
+ "SELECT %s FROM %s WHERE 1 != 1",
+ quoteIdentifier(column), tableIdentifier(tablePath));
+ try (Statement statement = connection.createStatement()) {
+ return statement.execute(selectColumnSQL);
+ } catch (SQLException e) {
+ log.debug("Column {} does not exist in table {}", column,
tablePath.getFullName(), e);
+ return false;
+ }
+ }
+
+ default void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableAddColumnEvent event)
+ throws SQLException {
+ String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+ Column addColumn = event.getColumn();
+ String addColumnSQL =
+ buildAlterTableSql(
+ event.getSourceDialectName(),
addColumn.getSourceType(),
AlterType.ADD.name(),
addColumn,
tableIdentifierWithQuoted,
StringUtils.EMPTY);
- case SCHEMA_CHANGE_DROP_COLUMN:
- String dropColumn = ((AlterTableDropColumnEvent)
event).getColumn();
- return buildAlterTableSql(
- sourceDialectName,
- null,
- AlterType.DROP.name(),
- null,
+ try (Statement statement = connection.createStatement()) {
+ log.info("Executing add column SQL: " + addColumnSQL);
+ statement.execute(addColumnSQL);
+ }
+ }
+
+ default void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableChangeColumnEvent event)
+ throws SQLException {
+ String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+ Column changeColumn = event.getColumn();
+ String oldColumnName = event.getOldColumn();
+ String changeColumnSQL =
+ buildAlterTableSql(
+ event.getSourceDialectName(),
+ changeColumn.getSourceType(),
+ AlterType.CHANGE.name(),
+ changeColumn,
tableIdentifierWithQuoted,
- dropColumn);
- case SCHEMA_CHANGE_MODIFY_COLUMN:
- Column modifyColumn = ((AlterTableModifyColumnEvent)
event).getColumn();
- return buildAlterTableSql(
- sourceDialectName,
+ oldColumnName);
+
+ try (Statement statement = connection.createStatement()) {
+ log.info("Executing change column SQL: " + changeColumnSQL);
+ statement.execute(changeColumnSQL);
+ }
+ }
+
+ default void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableModifyColumnEvent event)
+ throws SQLException {
+ String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+ Column modifyColumn = event.getColumn();
+ String modifyColumnSQL =
+ buildAlterTableSql(
+ event.getSourceDialectName(),
modifyColumn.getSourceType(),
AlterType.MODIFY.name(),
modifyColumn,
tableIdentifierWithQuoted,
StringUtils.EMPTY);
- case SCHEMA_CHANGE_CHANGE_COLUMN:
- AlterTableChangeColumnEvent alterTableChangeColumnEvent =
- (AlterTableChangeColumnEvent) event;
- Column changeColumn = alterTableChangeColumnEvent.getColumn();
- String oldColumnName =
alterTableChangeColumnEvent.getOldColumn();
- return buildAlterTableSql(
- sourceDialectName,
- changeColumn.getSourceType(),
- AlterType.CHANGE.name(),
- changeColumn,
+
+ try (Statement statement = connection.createStatement()) {
+ log.info("Executing modify column SQL: " + modifyColumnSQL);
+ statement.execute(modifyColumnSQL);
+ }
+ }
+
+ default void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableDropColumnEvent event)
+ throws SQLException {
+ String tableIdentifierWithQuoted = tableIdentifier(tablePath);
+ String dropColumn = event.getColumn();
+ String dropColumnSQL =
+ buildAlterTableSql(
+ event.getSourceDialectName(),
+ null,
+ AlterType.DROP.name(),
+ null,
tableIdentifierWithQuoted,
- oldColumnName);
- default:
- throw new SeaTunnelException(
- "Unsupported schemaChangeEvent for event type: " +
event.getEventType());
+ dropColumn);
+ try (Statement statement = connection.createStatement()) {
+ log.info("Executing drop column SQL: " + dropColumnSQL);
+ statement.execute(dropColumnSQL);
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index fd0af3d9ff..22431b0d96 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -19,10 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
-import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -89,6 +85,11 @@ public class MysqlDialect implements JdbcDialect {
return "`" + identifier + "`";
}
+ @Override
+ public String tableIdentifier(TablePath tablePath) {
+ return tableIdentifier(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
@@ -226,24 +227,6 @@ public class MysqlDialect implements JdbcDialect {
return SQLUtils.countForSubquery(connection, table.getQuery());
}
- @Override
- public void refreshTableSchemaBySchemaChangeEvent(
- String sourceDialectName,
- AlterTableColumnEvent event,
- JdbcConnectionProvider refreshTableSchemaConnectionProvider,
- TablePath sinkTablePath) {
- try (Connection connection =
-
refreshTableSchemaConnectionProvider.getOrEstablishConnection();
- Statement stmt = connection.createStatement()) {
- String alterTableSql = generateAlterTableSql(sourceDialectName,
event, sinkTablePath);
- log.info("Apply schema change with sql: {}", alterTableSql);
- stmt.execute(alterTableSql);
- } catch (Exception e) {
- throw new JdbcConnectorException(
-
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
- }
- }
-
@Override
public String decorateWithComment(String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
index 83d3220b12..1c5d7734fb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -19,10 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbas
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
-import org.apache.seatunnel.api.table.event.AlterTableColumnEvent;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -92,6 +88,11 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
return "`" + identifier + "`";
}
+ @Override
+ public String tableIdentifier(TablePath tablePath) {
+ return tableIdentifier(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
@@ -229,24 +230,6 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
return SQLUtils.countForSubquery(connection, table.getQuery());
}
- @Override
- public void refreshTableSchemaBySchemaChangeEvent(
- String sourceDialectName,
- AlterTableColumnEvent event,
- JdbcConnectionProvider refreshTableSchemaConnectionProvider,
- TablePath sinkTablePath) {
- try (Connection connection =
-
refreshTableSchemaConnectionProvider.getOrEstablishConnection();
- Statement stmt = connection.createStatement()) {
- String alterTableSql = generateAlterTableSql(sourceDialectName,
event, sinkTablePath);
- log.info("Apply schema change with sql: {}", alterTableSql);
- stmt.execute(alterTableSql);
- } catch (Exception e) {
- throw new JdbcConnectorException(
-
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
- }
- }
-
@Override
public String decorateWithComment(String basicSql, BasicTypeDefine
typeBasicTypeDefine) {
OceanBaseMysqlType nativeType = (OceanBaseMysqlType)
typeBasicTypeDefine.getNativeType();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index ca7c457b7d..f894999a42 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -47,6 +47,7 @@ import org.apache.commons.lang3.StringUtils;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.sql.Connection;
import java.util.List;
@Slf4j
@@ -66,22 +67,22 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
public void applySchemaChange(SchemaChangeEvent event) throws IOException {
if (event instanceof AlterTableColumnsEvent) {
AlterTableColumnsEvent alterTableColumnsEvent =
(AlterTableColumnsEvent) event;
- String sourceDialectName =
alterTableColumnsEvent.getSourceDialectName();
- if (StringUtils.isBlank(sourceDialectName)) {
- throw new SeaTunnelException(
- "The sourceDialectName in AlterTableColumnEvent can
not be empty");
- }
List<AlterTableColumnEvent> events =
alterTableColumnsEvent.getEvents();
for (AlterTableColumnEvent alterTableColumnEvent : events) {
- processSchemaChangeEvent(alterTableColumnEvent,
sourceDialectName);
+ String sourceDialectName =
alterTableColumnEvent.getSourceDialectName();
+ if (StringUtils.isBlank(sourceDialectName)) {
+ throw new SeaTunnelException(
+ "The sourceDialectName in AlterTableColumnEvent
can not be empty. event: "
+ + event);
+ }
+ processSchemaChangeEvent(alterTableColumnEvent);
}
} else {
log.warn("We only support AlterTableColumnsEvent, but actual event
is " + event);
}
}
- protected void processSchemaChangeEvent(AlterTableColumnEvent event,
String sourceDialectName)
- throws IOException {
+ protected void processSchemaChangeEvent(AlterTableColumnEvent event)
throws IOException {
TableSchema newTableSchema = this.tableSchema.copy();
List<Column> columns = newTableSchema.getColumns();
switch (event.getEventType()) {
@@ -109,17 +110,16 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
"Unsupported schemaChangeEvent for event type: " +
event.getEventType());
}
this.tableSchema = newTableSchema;
- reOpenOutputFormat(event, sourceDialectName);
+ reOpenOutputFormat(event);
}
- protected void reOpenOutputFormat(AlterTableColumnEvent event, String
sourceDialectName)
- throws IOException {
+ protected void reOpenOutputFormat(AlterTableColumnEvent event) throws
IOException {
this.prepareCommit();
- try {
- JdbcConnectionProvider refreshTableSchemaConnectionProvider =
-
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
- dialect.refreshTableSchemaBySchemaChangeEvent(
- sourceDialectName, event,
refreshTableSchemaConnectionProvider, sinkTablePath);
+ JdbcConnectionProvider refreshTableSchemaConnectionProvider =
+
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
+ try (Connection connection =
+
refreshTableSchemaConnectionProvider.getOrEstablishConnection()) {
+ dialect.applySchemaChange(event, connection, sinkTablePath);
} catch (Throwable e) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
index 0594a42d95..632b643bb2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql_with_schema_change.conf
@@ -50,5 +50,8 @@ sink {
database = shop
table = mysql_cdc_e2e_sink_table_with_schema_change
primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
index dd688a5ef5..8433f155e9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/mysql_cdc_to_iceberg_for_schema_change.conf
@@ -64,5 +64,8 @@ sink {
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
}
}