This is an automated email from the ASF dual-hosted git repository.

dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1f0a238f7 [Improve][Jdbc] Refactor ddl change (#8134)
e1f0a238f7 is described below

commit e1f0a238f7bc325e1ab6c4db009a544ac34a0c5e
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 26 19:02:52 2024 +0800

    [Improve][Jdbc] Refactor ddl change (#8134)
---
 .../jdbc/internal/dialect/JdbcDialect.java         | 453 ++++++++++-----------
 .../jdbc/internal/dialect/mysql/MysqlDialect.java  |  15 +-
 .../oceanbase/OceanBaseMySqlTypeConverter.java     |   2 +
 .../dialect/oceanbase/OceanBaseMysqlDialect.java   |  15 +-
 .../internal/dialect/oracle/OracleDialect.java     | 162 +++++++-
 .../jdbc/sink/AbstractJdbcSinkWriter.java          |  81 +---
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  23 --
 7 files changed, 382 insertions(+), 369 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 953bb30ee7..56b2b285c8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,10 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
-import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
-import org.apache.seatunnel.api.table.converter.ConverterLoader;
 import org.apache.seatunnel.api.table.converter.TypeConverter;
 import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
@@ -29,7 +27,7 @@ import 
org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.api.table.type.SqlType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
@@ -82,6 +80,15 @@ public interface JdbcDialect extends Serializable {
      */
     JdbcRowConverter getRowConverter();
 
+    /**
+     * Get converter that convert type object to seatunnel internal type.
+     *
+     * @return a type converter for the database
+     */
+    default TypeConverter<BasicTypeDefine> getTypeConverter() {
+        throw new UnsupportedOperationException("TypeConverter is not 
supported");
+    }
+
     /**
      * get jdbc meta-information type to seatunnel data type mapper.
      *
@@ -441,16 +448,16 @@ public interface JdbcDialect extends Serializable {
     /**
      * Refresh physical table schema by schema change event
      *
-     * @param event schema change event
      * @param connection jdbc connection
      * @param tablePath sink table path
+     * @param event schema change event
      */
     default void applySchemaChange(
-            SchemaChangeEvent event, Connection connection, TablePath 
tablePath)
+            Connection connection, TablePath tablePath, SchemaChangeEvent 
event)
             throws SQLException {
         if (event instanceof AlterTableColumnsEvent) {
             for (AlterTableColumnEvent columnEvent : ((AlterTableColumnsEvent) 
event).getEvents()) {
-                applySchemaChange(columnEvent, connection, tablePath);
+                applySchemaChange(connection, tablePath, columnEvent);
             }
         } else {
             if (event instanceof AlterTableChangeColumnEvent) {
@@ -497,8 +504,7 @@ public interface JdbcDialect extends Serializable {
                 }
                 applySchemaChange(connection, tablePath, dropColumnEvent);
             } else {
-                throw new SeaTunnelException(
-                        "Unsupported schemaChangeEvent : " + 
event.getEventType());
+                throw new UnsupportedOperationException("Unsupported 
schemaChangeEvent: " + event);
             }
         }
     }
@@ -527,20 +533,60 @@ public interface JdbcDialect extends Serializable {
     default void applySchemaChange(
             Connection connection, TablePath tablePath, 
AlterTableAddColumnEvent event)
             throws SQLException {
-        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
-        Column addColumn = event.getColumn();
-        String afterColumn = event.getAfterColumn();
-        String addColumnSQL =
-                buildAlterTableSql(
-                        event.getSourceDialectName(),
-                        addColumn.getSourceType(),
-                        AlterType.ADD.name(),
-                        addColumn,
-                        tableIdentifierWithQuoted,
-                        StringUtils.EMPTY,
-                        afterColumn);
+        boolean sameCatalog = 
event.getSourceDialectName().equals(dialectName());
+        BasicTypeDefine typeDefine = 
getTypeConverter().reconvert(event.getColumn());
+        String columnType =
+                sameCatalog ? event.getColumn().getSourceType() : 
typeDefine.getColumnType();
+        StringBuilder sqlBuilder =
+                new StringBuilder()
+                        .append("ALTER TABLE")
+                        .append(" ")
+                        .append(tableIdentifier(tablePath))
+                        .append(" ")
+                        .append("ADD COLUMN")
+                        .append(" ")
+                        .append(quoteIdentifier(event.getColumn().getName()))
+                        .append(" ")
+                        .append(columnType);
+
+        // Only decorate with default value when source dialect is same as 
sink dialect
+        // Todo Support for cross-database default values for ddl statements
+        if (event.getColumn().getDefaultValue() == null) {
+            sqlBuilder.append(" ").append(event.getColumn().isNullable() ? 
"NULL" : "NOT NULL");
+        } else {
+            if (event.getColumn().isNullable()) {
+                sqlBuilder.append(" NULL");
+            } else if (sameCatalog) {
+                sqlBuilder.append(" ").append(event.getColumn().isNullable() ? 
"NULL" : "NOT NULL");
+            } else if 
(SqlType.TIMESTAMP.equals(event.getColumn().getDataType().getSqlType())) {
+                log.warn(
+                        "Default value is not supported for column {} in table 
{}. Skipping add column operation. event: {}",
+                        event.getColumn().getName(),
+                        tablePath.getFullName(),
+                        event);
+            } else {
+                sqlBuilder.append(" NOT NULL");
+            }
+            if (sameCatalog) {
+                sqlBuilder.append(" 
").append(sqlClauseWithDefaultValue(typeDefine));
+            }
+        }
+
+        if (event.getColumn().getComment() != null) {
+            sqlBuilder
+                    .append(" ")
+                    .append("COMMENT ")
+                    .append("'")
+                    .append(event.getColumn().getComment())
+                    .append("'");
+        }
+        if (event.getAfterColumn() != null) {
+            sqlBuilder.append(" ").append("AFTER 
").append(quoteIdentifier(event.getAfterColumn()));
+        }
+
+        String addColumnSQL = sqlBuilder.toString();
         try (Statement statement = connection.createStatement()) {
-            log.info("Executing add column SQL: " + addColumnSQL);
+            log.info("Executing add column SQL: {}", addColumnSQL);
             statement.execute(addColumnSQL);
         }
     }
@@ -548,24 +594,79 @@ public interface JdbcDialect extends Serializable {
     default void applySchemaChange(
             Connection connection, TablePath tablePath, 
AlterTableChangeColumnEvent event)
             throws SQLException {
-        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
-        Column changeColumn = event.getColumn();
-        String oldColumnName = event.getOldColumn();
-        String afterColumn = event.getAfterColumn();
-        String changeColumnSQL =
-                buildAlterTableSql(
-                        event.getSourceDialectName(),
-                        changeColumn.getSourceType(),
-                        changeColumn.getDataType() == null
-                                ? AlterType.RENAME.name()
-                                : AlterType.CHANGE.name(),
-                        changeColumn,
-                        tableIdentifierWithQuoted,
-                        oldColumnName,
-                        afterColumn);
+        if (event.getColumn().getDataType() == null) {
+            StringBuilder sqlBuilder =
+                    new StringBuilder()
+                            .append("ALTER TABLE")
+                            .append(" ")
+                            .append(tableIdentifier(tablePath))
+                            .append(" ")
+                            .append("RENAME COLUMN")
+                            .append(" ")
+                            .append(quoteIdentifier(event.getOldColumn()))
+                            .append(" TO ")
+                            
.append(quoteIdentifier(event.getColumn().getName()));
+            try (Statement statement = connection.createStatement()) {
+                log.info("Executing rename column SQL: {}", sqlBuilder);
+                statement.execute(sqlBuilder.toString());
+            }
+            return;
+        }
+
+        boolean sameCatalog = 
event.getSourceDialectName().equals(dialectName());
+        BasicTypeDefine typeDefine = 
getTypeConverter().reconvert(event.getColumn());
+        String columnType =
+                sameCatalog ? event.getColumn().getSourceType() : 
typeDefine.getColumnType();
+        StringBuilder sqlBuilder =
+                new StringBuilder()
+                        .append("ALTER TABLE")
+                        .append(" ")
+                        .append(tableIdentifier(tablePath))
+                        .append(" ")
+                        .append("CHANGE COLUMN")
+                        .append(" ")
+                        .append(quoteIdentifier(event.getOldColumn()))
+                        .append(" ")
+                        .append(quoteIdentifier(event.getColumn().getName()))
+                        .append(" ")
+                        .append(columnType);
+        // Only decorate with default value when source dialect is same as 
sink dialect
+        // Todo Support for cross-database default values for ddl statements
+        if (event.getColumn().getDefaultValue() == null) {
+            sqlBuilder.append(" ").append(event.getColumn().isNullable() ? 
"NULL" : "NOT NULL");
+        } else {
+            if (event.getColumn().isNullable()) {
+                sqlBuilder.append(" NULL");
+            } else if (sameCatalog) {
+                sqlBuilder.append(" ").append(event.getColumn().isNullable() ? 
"NULL" : "NOT NULL");
+            } else if 
(SqlType.TIMESTAMP.equals(event.getColumn().getDataType().getSqlType())) {
+                log.warn(
+                        "Default value is not supported for column {} in table 
{}. Skipping add column operation. event: {}",
+                        event.getColumn().getName(),
+                        tablePath.getFullName(),
+                        event);
+            } else {
+                sqlBuilder.append(" NOT NULL");
+            }
+            if (sameCatalog) {
+                sqlBuilder.append(" 
").append(sqlClauseWithDefaultValue(typeDefine));
+            }
+        }
+        if (event.getColumn().getComment() != null) {
+            sqlBuilder
+                    .append(" ")
+                    .append("COMMENT ")
+                    .append("'")
+                    .append(event.getColumn().getComment())
+                    .append("'");
+        }
+        if (event.getAfterColumn() != null) {
+            sqlBuilder.append(" ").append("AFTER 
").append(quoteIdentifier(event.getAfterColumn()));
+        }
 
+        String changeColumnSQL = sqlBuilder.toString();
         try (Statement statement = connection.createStatement()) {
-            log.info("Executing change column SQL: " + changeColumnSQL);
+            log.info("Executing change column SQL: {}", changeColumnSQL);
             statement.execute(changeColumnSQL);
         }
     }
@@ -573,21 +674,60 @@ public interface JdbcDialect extends Serializable {
     default void applySchemaChange(
             Connection connection, TablePath tablePath, 
AlterTableModifyColumnEvent event)
             throws SQLException {
-        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
-        Column modifyColumn = event.getColumn();
-        String afterColumn = event.getAfterColumn();
-        String modifyColumnSQL =
-                buildAlterTableSql(
-                        event.getSourceDialectName(),
-                        modifyColumn.getSourceType(),
-                        AlterType.MODIFY.name(),
-                        modifyColumn,
-                        tableIdentifierWithQuoted,
-                        StringUtils.EMPTY,
-                        afterColumn);
 
+        boolean sameCatalog = 
event.getSourceDialectName().equals(dialectName());
+        BasicTypeDefine typeDefine = 
getTypeConverter().reconvert(event.getColumn());
+        String columnType =
+                sameCatalog ? event.getColumn().getSourceType() : 
typeDefine.getColumnType();
+        StringBuilder sqlBuilder =
+                new StringBuilder()
+                        .append("ALTER TABLE")
+                        .append(" ")
+                        .append(tableIdentifier(tablePath))
+                        .append(" ")
+                        .append("MODIFY COLUMN")
+                        .append(" ")
+                        .append(quoteIdentifier(event.getColumn().getName()))
+                        .append(" ")
+                        .append(columnType);
+
+        // Only decorate with default value when source dialect is same as 
sink dialect
+        // Todo Support for cross-database default values for ddl statements
+        if (event.getColumn().getDefaultValue() == null) {
+            sqlBuilder.append(" ").append(event.getColumn().isNullable() ? 
"NULL" : "NOT NULL");
+        } else {
+            if (event.getColumn().isNullable()) {
+                sqlBuilder.append(" NULL");
+            } else if (sameCatalog) {
+                sqlBuilder.append(" ").append(event.getColumn().isNullable() ? 
"NULL" : "NOT NULL");
+            } else if 
(SqlType.TIMESTAMP.equals(event.getColumn().getDataType().getSqlType())) {
+                log.warn(
+                        "Default value is not supported for column {} in table 
{}. Skipping add column operation. event: {}",
+                        event.getColumn().getName(),
+                        tablePath.getFullName(),
+                        event);
+            } else {
+                sqlBuilder.append(" NOT NULL");
+            }
+            if (sameCatalog) {
+                sqlBuilder.append(" 
").append(sqlClauseWithDefaultValue(typeDefine));
+            }
+        }
+        if (event.getColumn().getComment() != null) {
+            sqlBuilder
+                    .append(" ")
+                    .append("COMMENT ")
+                    .append("'")
+                    .append(event.getColumn().getComment())
+                    .append("'");
+        }
+        if (event.getAfterColumn() != null) {
+            sqlBuilder.append(" ").append("AFTER 
").append(quoteIdentifier(event.getAfterColumn()));
+        }
+
+        String modifyColumnSQL = sqlBuilder.toString();
         try (Statement statement = connection.createStatement()) {
-            log.info("Executing modify column SQL: " + modifyColumnSQL);
+            log.info("Executing modify column SQL: {}", modifyColumnSQL);
             statement.execute(modifyColumnSQL);
         }
     }
@@ -595,213 +735,40 @@ public interface JdbcDialect extends Serializable {
     default void applySchemaChange(
             Connection connection, TablePath tablePath, 
AlterTableDropColumnEvent event)
             throws SQLException {
-        String tableIdentifierWithQuoted = tableIdentifier(tablePath);
-        String dropColumn = event.getColumn();
         String dropColumnSQL =
-                buildAlterTableSql(
-                        event.getSourceDialectName(),
-                        null,
-                        AlterType.DROP.name(),
-                        null,
-                        tableIdentifierWithQuoted,
-                        dropColumn,
-                        null);
+                String.format(
+                        "ALTER TABLE %s DROP COLUMN %s",
+                        tableIdentifier(tablePath), 
quoteIdentifier(event.getColumn()));
         try (Statement statement = connection.createStatement()) {
-            log.info("Executing drop column SQL: " + dropColumnSQL);
+            log.info("Executing drop column SQL: {}", dropColumnSQL);
             statement.execute(dropColumnSQL);
         }
     }
 
     /**
-     * build alter table sql
-     *
-     * @param sourceDialectName source dialect name
-     * @param sourceColumnType source column type
-     * @param alterOperation alter operation of ddl
-     * @param newColumn new column after ddl
-     * @param tableName table name of sink table
-     * @param oldColumnName old column name before ddl
-     * @param afterColumn column before the new column
-     * @return alter table sql for sink table after schema change
-     */
-    default String buildAlterTableSql(
-            String sourceDialectName,
-            String sourceColumnType,
-            String alterOperation,
-            Column newColumn,
-            String tableName,
-            String oldColumnName,
-            String afterColumn) {
-        if (StringUtils.equals(alterOperation, AlterType.DROP.name())) {
-            return String.format(
-                    "ALTER TABLE %s drop column %s", tableName, 
quoteIdentifier(oldColumnName));
-        }
-
-        if (alterOperation.equalsIgnoreCase(AlterType.RENAME.name())) {
-            return String.format(
-                    "ALTER TABLE %s RENAME COLUMN %s TO %s",
-                    tableName, oldColumnName, newColumn.getName());
-        }
-
-        TypeConverter<?> typeConverter = 
ConverterLoader.loadTypeConverter(dialectName());
-        BasicTypeDefine typeBasicTypeDefine = (BasicTypeDefine) 
typeConverter.reconvert(newColumn);
-
-        String basicSql = buildAlterTableBasicSql(alterOperation, tableName);
-        basicSql =
-                decorateWithColumnNameAndType(
-                        sourceDialectName,
-                        sourceColumnType,
-                        basicSql,
-                        alterOperation,
-                        newColumn,
-                        oldColumnName,
-                        typeBasicTypeDefine.getColumnType());
-        // Only decorate with default value when source dialect is same as 
sink dialect
-        // Todo Support for cross-database default values for ddl statements
-        if (sourceDialectName.equals(dialectName())) {
-            basicSql = decorateWithDefaultValue(basicSql, typeBasicTypeDefine);
-        }
-        basicSql = decorateWithNullable(basicSql, typeBasicTypeDefine, 
sourceDialectName);
-        basicSql = decorateWithComment(tableName, basicSql, 
typeBasicTypeDefine);
-        basicSql = decorateWithAfterColumn(basicSql, afterColumn);
-        return dialectName().equals(DatabaseIdentifier.ORACLE) ? basicSql : 
basicSql + ";";
-    }
-
-    /**
-     * build the body of alter table sql
-     *
-     * @param alterOperation alter operation of ddl
-     * @param tableName table name of sink table
-     * @return basic sql of alter table for sink table
-     */
-    default String buildAlterTableBasicSql(String alterOperation, String 
tableName) {
-        StringBuilder sql =
-                new StringBuilder(
-                        "ALTER TABLE "
-                                + tableName
-                                + StringUtils.SPACE
-                                + alterOperation
-                                + StringUtils.SPACE);
-        return sql.toString();
-    }
-
-    /**
-     * decorate the sql with column name and type
-     *
-     * @param sourceDialectName source dialect name
-     * @param sourceColumnType source column type
-     * @param basicSql basic sql of alter table for sink table
-     * @param alterOperation alter operation of ddl
-     * @param newColumn new column after ddl
-     * @param oldColumnName old column name before ddl
-     * @param columnType column type of new column
-     * @return basic sql with column name and type of alter table for sink 
table
-     */
-    default String decorateWithColumnNameAndType(
-            String sourceDialectName,
-            String sourceColumnType,
-            String basicSql,
-            String alterOperation,
-            Column newColumn,
-            String oldColumnName,
-            String columnType) {
-        StringBuilder sql = new StringBuilder(basicSql);
-        String oldColumnNameWithQuoted = quoteIdentifier(oldColumnName);
-        String newColumnNameWithQuoted = quoteIdentifier(newColumn.getName());
-        if (alterOperation.equals(AlterType.CHANGE.name())) {
-            sql.append(oldColumnNameWithQuoted)
-                    .append(StringUtils.SPACE)
-                    .append(newColumnNameWithQuoted)
-                    .append(StringUtils.SPACE);
-        } else {
-            sql.append(newColumnNameWithQuoted).append(StringUtils.SPACE);
-        }
-        if (sourceDialectName.equals(dialectName())) {
-            sql.append(sourceColumnType);
-        } else {
-            sql.append(columnType);
-        }
-        sql.append(StringUtils.SPACE);
-        return sql.toString();
-    }
-
-    /**
-     * decorate with nullable
-     *
-     * @param basicSql alter table sql for sink table
-     * @param typeBasicTypeDefine type basic type define of new column
-     * @param sourceDialectName source dialect name
-     * @return alter table sql with nullable for sink table
-     */
-    default String decorateWithNullable(
-            String basicSql, BasicTypeDefine typeBasicTypeDefine, String 
sourceDialectName) {
-        StringBuilder sql = new StringBuilder(basicSql);
-        if (typeBasicTypeDefine.isNullable()
-                && !dialectName().equalsIgnoreCase(DatabaseIdentifier.ORACLE)) 
{
-            sql.append("NULL ");
-        } else {
-            // Todo: Support cross-dabaase default values for ddl statements 
which can remove this
-            if (!(!dialectName().equalsIgnoreCase(sourceDialectName)
-                    && dialectName().equalsIgnoreCase(DatabaseIdentifier.MYSQL)
-                    && 
typeBasicTypeDefine.getDataType().equalsIgnoreCase("datetime"))) {
-                sql.append("NOT NULL ");
-            }
-        }
-        return sql.toString();
-    }
-
-    /**
-     * decorate with default value
+     * Get the SQL clause for define column default value
      *
-     * @param basicSql alter table sql for sink table
-     * @param typeBasicTypeDefine type basic type define of new column
-     * @return alter table sql with default value for sink table
+     * @param columnDefine column define
+     * @return SQL clause for define default value
      */
-    default String decorateWithDefaultValue(String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
-        Object defaultValue = typeBasicTypeDefine.getDefaultValue();
+    default String sqlClauseWithDefaultValue(BasicTypeDefine columnDefine) {
+        Object defaultValue = columnDefine.getDefaultValue();
         if (Objects.nonNull(defaultValue)
-                && 
needsQuotesWithDefaultValue(typeBasicTypeDefine.getColumnType())
+                && needsQuotesWithDefaultValue(columnDefine.getColumnType())
                 && !isSpecialDefaultValue(defaultValue)) {
             defaultValue = quotesDefaultValue(defaultValue);
         }
-        StringBuilder sql = new StringBuilder(basicSql);
-        if (Objects.nonNull(defaultValue)) {
-            sql.append("DEFAULT 
").append(defaultValue).append(StringUtils.SPACE);
-        }
-        return sql.toString();
+        return "DEFAULT " + defaultValue;
     }
 
     /**
-     * decorate with comment
+     * Whether support default value
      *
-     * @param tableName table name with quoted
-     * @param basicSql alter table sql for sink table
-     * @param typeBasicTypeDefine type basic type define of new column
-     * @return alter table sql with comment for sink table
+     * @param columnDefine column define
+     * @return whether support set default value
      */
-    default String decorateWithComment(
-            String tableName, String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
-        String comment = typeBasicTypeDefine.getComment();
-        StringBuilder sql = new StringBuilder(basicSql);
-        if (StringUtils.isNotBlank(comment)) {
-            sql.append("COMMENT '").append(comment).append("'");
-        }
-        return sql.toString();
-    }
-
-    /**
-     * decorate with after
-     *
-     * @param basicSql alter table sql for sink table
-     * @param afterColumn column before the new column
-     * @return alter table sql with after for sink table
-     */
-    default String decorateWithAfterColumn(String basicSql, String 
afterColumn) {
-        StringBuilder sql = new StringBuilder(basicSql);
-        if (StringUtils.isNotBlank(afterColumn)) {
-            sql.append("AFTER ").append(afterColumn).append(StringUtils.SPACE);
-        }
-        return sql.toString();
+    default boolean supportDefaultValue(BasicTypeDefine columnDefine) {
+        return true;
     }
 
     /**
@@ -833,12 +800,4 @@ public interface JdbcDialect extends Serializable {
     default String quotesDefaultValue(Object defaultValue) {
         return "'" + defaultValue + "'";
     }
-
-    enum AlterType {
-        ADD,
-        DROP,
-        MODIFY,
-        CHANGE,
-        RENAME
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
index 163c030445..cc3f1300b5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -70,6 +71,12 @@ public class MysqlDialect implements JdbcDialect {
         return new MysqlJdbcRowConverter();
     }
 
+    @Override
+    public TypeConverter<BasicTypeDefine> getTypeConverter() {
+        TypeConverter typeConverter = MySqlTypeConverter.DEFAULT_INSTANCE;
+        return typeConverter;
+    }
+
     @Override
     public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
         return new MySqlTypeMapper();
@@ -228,13 +235,9 @@ public class MysqlDialect implements JdbcDialect {
     }
 
     @Override
-    public String decorateWithComment(
-            String tableName, String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
+    public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
         MysqlType nativeType = (MysqlType) typeBasicTypeDefine.getNativeType();
-        if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
-            return basicSql;
-        }
-        return JdbcDialect.super.decorateWithComment(tableName, basicSql, 
typeBasicTypeDefine);
+        return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
index 78c8415a88..72c771f9ed 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMySqlTypeConverter.java
@@ -104,6 +104,8 @@ public class OceanBaseMySqlTypeConverter
     private static final String VECTOR_TYPE_NAME = "";
     private static final String VECTOR_NAME = "VECTOR";
 
+    public static final OceanBaseMySqlTypeConverter INSTANCE = new 
OceanBaseMySqlTypeConverter();
+
     @Override
     public String identifier() {
         return DatabaseIdentifier.OCENABASE;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
index 1824d6c76a..315eccaa52 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseMysqlDialect.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbas
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -73,6 +74,12 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
         return new OceanBaseMysqlJdbcRowConverter();
     }
 
+    @Override
+    public TypeConverter<BasicTypeDefine> getTypeConverter() {
+        TypeConverter typeConverter = OceanBaseMySqlTypeConverter.INSTANCE;
+        return typeConverter;
+    }
+
     @Override
     public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
         return new OceanBaseMySqlTypeMapper();
@@ -231,13 +238,9 @@ public class OceanBaseMysqlDialect implements JdbcDialect {
     }
 
     @Override
-    public String decorateWithComment(
-            String tableName, String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
+    public boolean supportDefaultValue(BasicTypeDefine typeBasicTypeDefine) {
         OceanBaseMysqlType nativeType = (OceanBaseMysqlType) 
typeBasicTypeDefine.getNativeType();
-        if (NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType)) {
-            return basicSql;
-        }
-        return JdbcDialect.super.decorateWithComment(tableName, basicSql, 
typeBasicTypeDefine);
+        return !(NOT_SUPPORTED_DEFAULT_VALUES.contains(nativeType));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index b314302ba4..b80620a4ce 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -17,8 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -64,6 +70,11 @@ public class OracleDialect implements JdbcDialect {
         return new OracleJdbcRowConverter();
     }
 
+    @Override
+    public TypeConverter<BasicTypeDefine> getTypeConverter() {
+        return OracleTypeConverter.INSTANCE;
+    }
+
     @Override
     public String hashModForField(String fieldName, int mod) {
         return "MOD(ORA_HASH(" + quoteIdentifier(fieldName) + ")," + mod + ")";
@@ -329,17 +340,146 @@ public class OracleDialect implements JdbcDialect {
     }
 
     @Override
-    public String decorateWithComment(
-            String tableName, String basicSql, BasicTypeDefine 
typeBasicTypeDefine) {
-        String comment = typeBasicTypeDefine.getComment();
-        StringBuilder sql = new StringBuilder(basicSql);
-        if (StringUtils.isNotBlank(comment)) {
-            String commentSql =
-                    String.format(
-                            "COMMENT ON COLUMN %s.%s IS '%s'",
-                            tableName, 
quoteIdentifier(typeBasicTypeDefine.getName()), comment);
-            sql.append(";\n").append(commentSql);
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableAddColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        ddlSQL.add(buildUpdateColumnSQL(connection, tablePath, event));
+
+        if (event.getColumn().getComment() != null) {
+            ddlSQL.add(buildUpdateColumnCommentSQL(tablePath, 
event.getColumn()));
+        }
+
+        try (Statement statement = connection.createStatement()) {
+            for (String sql : ddlSQL) {
+                log.info("Executing add column SQL: {}", sql);
+                statement.execute(sql);
+            }
+        }
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableChangeColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        if (event.getOldColumn() != null
+                && 
!(event.getColumn().getName().equals(event.getOldColumn()))) {
+            StringBuilder sqlBuilder =
+                    new StringBuilder()
+                            .append("ALTER TABLE ")
+                            .append(tableIdentifier(tablePath))
+                            .append(" RENAME COLUMN ")
+                            .append(quoteIdentifier(event.getOldColumn()))
+                            .append(" TO ")
+                            
.append(quoteIdentifier(event.getColumn().getName()));
+            ddlSQL.add(sqlBuilder.toString());
+        }
+
+        try (Statement statement = connection.createStatement()) {
+            for (String sql : ddlSQL) {
+                log.info("Executing change column SQL: {}", sql);
+                statement.execute(sql);
+            }
+        }
+
+        if (event.getColumn().getDataType() != null) {
+            applySchemaChange(
+                    connection,
+                    tablePath,
+                    
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
+        }
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableModifyColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        ddlSQL.add(buildUpdateColumnSQL(connection, tablePath, event));
+
+        if (event.getColumn().getComment() != null) {
+            ddlSQL.add(buildUpdateColumnCommentSQL(tablePath, 
event.getColumn()));
+        }
+
+        try (Statement statement = connection.createStatement()) {
+            for (String sql : ddlSQL) {
+                log.info("Executing modify column SQL: {}", sql);
+                statement.execute(sql);
+            }
+        }
+    }
+
+    private String buildUpdateColumnSQL(
+            Connection connection, TablePath tablePath, AlterTableColumnEvent 
event)
+            throws SQLException {
+        String actionType;
+        Column column;
+        if (event instanceof AlterTableModifyColumnEvent) {
+            actionType = "MODIFY";
+            column = ((AlterTableModifyColumnEvent) event).getColumn();
+        } else if (event instanceof AlterTableAddColumnEvent) {
+            actionType = "ADD";
+            column = ((AlterTableAddColumnEvent) event).getColumn();
+        } else {
+            throw new IllegalArgumentException("Unsupported 
AlterTableColumnEvent: " + event);
+        }
+
+        boolean sameCatalog = 
event.getSourceDialectName().equals(dialectName());
+        BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+        String columnType = sameCatalog ? column.getSourceType() : 
typeDefine.getColumnType();
+        StringBuilder sqlBuilder =
+                new StringBuilder()
+                        .append("ALTER TABLE  ")
+                        .append(tableIdentifier(tablePath))
+                        .append(" ")
+                        .append(actionType)
+                        .append(" ")
+                        .append(quoteIdentifier(column.getName()))
+                        .append(" ")
+                        .append(columnType);
+        // Only decorate with default value when source dialect is same as 
sink dialect
+        // Todo Support for cross-database default values for ddl statements
+        if (column.getDefaultValue() != null && sameCatalog) {
+            sqlBuilder.append(" 
").append(sqlClauseWithDefaultValue(typeDefine));
+        }
+        if (event instanceof AlterTableModifyColumnEvent) {
+            boolean targetColumnNullable =
+                    columnIsNullable(connection, tablePath, column.getName());
+            if (column.isNullable() != targetColumnNullable) {
+                sqlBuilder.append(" ").append(column.isNullable() ? "NULL" : 
"NOT NULL");
+            }
+        } else {
+            sqlBuilder.append(" ").append(column.isNullable() ? "NULL" : "NOT 
NULL");
+        }
+        return sqlBuilder.toString();
+    }
+
+    private String buildUpdateColumnCommentSQL(TablePath tablePath, Column 
column) {
+        return String.format(
+                "COMMENT ON COLUMN %s.%s IS '%s'",
+                tableIdentifier(tablePath), quoteIdentifier(column.getName()), 
column.getComment());
+    }
+
+    private boolean columnIsNullable(Connection connection, TablePath 
tablePath, String column)
+            throws SQLException {
+        String selectColumnSQL =
+                "SELECT"
+                        + "        NULLABLE FROM"
+                        + "        ALL_TAB_COLUMNS c"
+                        + "        WHERE c.owner = '"
+                        + tablePath.getSchemaName()
+                        + "'"
+                        + "        AND c.table_name = '"
+                        + tablePath.getTableName()
+                        + "'"
+                        + "        AND c.column_name = '"
+                        + column
+                        + "'";
+        try (Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery(selectColumnSQL);
+            rs.next();
+            return rs.getString("NULLABLE").equals("Y");
         }
-        return sql.toString();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index 4d7a9ef2fd..67de53dfc5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -17,20 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
-import org.apache.seatunnel.api.event.EventType;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
-import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
-import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
-import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
-import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -50,9 +45,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.sql.Connection;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
 @Slf4j
 public abstract class AbstractJdbcSinkWriter<ResourceT>
@@ -67,6 +60,8 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
     protected JdbcConnectionProvider connectionProvider;
     protected JdbcSinkConfig jdbcSinkConfig;
     protected JdbcOutputFormat<SeaTunnelRow, 
JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
+    protected TableSchemaChangeEventDispatcher tableSchemaChanger =
+            new TableSchemaChangeEventDispatcher();
 
     @Override
     public void applySchemaChange(SchemaChangeEvent event) throws IOException {
@@ -88,57 +83,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
     }
 
     protected void processSchemaChangeEvent(AlterTableColumnEvent event) 
throws IOException {
-        List<Column> columns = new ArrayList<>(tableSchema.getColumns());
-        switch (event.getEventType()) {
-            case SCHEMA_CHANGE_ADD_COLUMN:
-                AlterTableAddColumnEvent alterTableAddColumnEvent =
-                        (AlterTableAddColumnEvent) event;
-                Column addColumn = alterTableAddColumnEvent.getColumn();
-                String afterColumn = alterTableAddColumnEvent.getAfterColumn();
-                if (StringUtils.isNotBlank(afterColumn)) {
-                    Optional<Column> columnOptional =
-                            columns.stream()
-                                    .filter(column -> 
afterColumn.equals(column.getName()))
-                                    .findFirst();
-                    if (!columnOptional.isPresent()) {
-                        columns.add(addColumn);
-                        break;
-                    }
-                    columnOptional.ifPresent(
-                            column -> {
-                                int index = columns.indexOf(column);
-                                columns.add(index + 1, addColumn);
-                            });
-                } else {
-                    columns.add(addColumn);
-                }
-                break;
-            case SCHEMA_CHANGE_DROP_COLUMN:
-                String dropColumn = ((AlterTableDropColumnEvent) 
event).getColumn();
-                columns.removeIf(column -> 
column.getName().equalsIgnoreCase(dropColumn));
-                break;
-            case SCHEMA_CHANGE_MODIFY_COLUMN:
-                Column modifyColumn = ((AlterTableModifyColumnEvent) 
event).getColumn();
-                replaceColumnByIndex(
-                        event.getEventType(), columns, modifyColumn.getName(), 
modifyColumn);
-                break;
-            case SCHEMA_CHANGE_CHANGE_COLUMN:
-                AlterTableChangeColumnEvent alterTableChangeColumnEvent =
-                        (AlterTableChangeColumnEvent) event;
-                Column changeColumn = alterTableChangeColumnEvent.getColumn();
-                String oldColumnName = 
alterTableChangeColumnEvent.getOldColumn();
-                replaceColumnByIndex(event.getEventType(), columns, 
oldColumnName, changeColumn);
-                break;
-            default:
-                throw new SeaTunnelException(
-                        "Unsupported schemaChangeEvent for event type: " + 
event.getEventType());
-        }
-        this.tableSchema =
-                TableSchema.builder()
-                        .columns(columns)
-                        .primaryKey(tableSchema.getPrimaryKey())
-                        .constraintKey(tableSchema.getConstraintKeys())
-                        .build();
+        this.tableSchema = tableSchemaChanger.reset(tableSchema).apply(event);
         reOpenOutputFormat(event);
     }
 
@@ -148,7 +93,7 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
                 
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
         try (Connection connection =
                 
refreshTableSchemaConnectionProvider.getOrEstablishConnection()) {
-            dialect.applySchemaChange(event, connection, sinkTablePath);
+            dialect.applySchemaChange(connection, sinkTablePath, event);
         } catch (Throwable e) {
             throw new JdbcConnectorException(
                     
JdbcConnectorErrorCode.REFRESH_PHYSICAL_TABLESCHEMA_BY_SCHEMA_CHANGE_EVENT, e);
@@ -159,20 +104,4 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
                         .build();
         this.outputFormat.open();
     }
-
-    protected void replaceColumnByIndex(
-            EventType eventType, List<Column> columns, String oldColumnName, 
Column newColumn) {
-        for (int i = 0; i < columns.size(); i++) {
-            Column column = columns.get(i);
-            if (column.getName().equalsIgnoreCase(oldColumnName)) {
-                // rename ...... to ......  which just has column name
-                if (eventType.equals(EventType.SCHEMA_CHANGE_CHANGE_COLUMN)
-                        && newColumn.getDataType() == null) {
-                    columns.set(i, column.rename(newColumn.getName()));
-                } else {
-                    columns.set(i, newColumn);
-                }
-            }
-        }
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 41dd41ff9e..3f43b2088d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -17,9 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
 
-import org.apache.seatunnel.api.event.EventType;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
-import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -30,7 +28,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorExc
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -43,8 +40,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.api.event.EventType.SCHEMA_CHANGE_CHANGE_COLUMN;
-
 @Slf4j
 public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager> {
     private final Integer primaryKeyIndex;
@@ -167,22 +162,4 @@ public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager
             outputFormat.close();
         }
     }
-
-    @Override
-    protected void replaceColumnByIndex(
-            EventType eventType, List<Column> columns, String oldColumnName, 
Column newColumn) {
-        // The operation of renaming a column in Oracle is only supported to 
modify the column name,
-        // so we just modify the column name directly.
-        if (eventType.equals(SCHEMA_CHANGE_CHANGE_COLUMN) && dialect 
instanceof OracleDialect) {
-            for (int i = 0; i < columns.size(); i++) {
-                Column column = columns.get(i);
-                if (column.getName().equalsIgnoreCase(oldColumnName)) {
-                    column = column.rename(newColumn.getName());
-                    columns.set(i, column);
-                }
-            }
-            return;
-        }
-        super.replaceColumnByIndex(eventType, columns, oldColumnName, 
newColumn);
-    }
 }

Reply via email to