This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3fb05da365 [Improve][Connector-V2] Improve schema evolution on column 
insert after for mysql-jdbc (#8017)
3fb05da365 is described below

commit 3fb05da365649dddaeaf7cc21e167037b3bd40f6
Author: Shiwanming <[email protected]>
AuthorDate: Sat Nov 16 21:25:48 2024 +0800

    [Improve][Connector-V2] Improve schema evolution on column insert after for 
mysql-jdbc (#8017)
---
 .../jdbc/internal/dialect/JdbcDialect.java         | 35 ++++++++++++++++++----
 .../jdbc/sink/AbstractJdbcSinkWriter.java          | 24 +++++++++++++--
 .../src/test/resources/ddl/add_columns.sql         | 13 ++++++++
 .../src/test/resources/ddl/drop_columns.sql        |  2 +-
 4 files changed, 66 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 7b72c43ed5..953bb30ee7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -529,6 +529,7 @@ public interface JdbcDialect extends Serializable {
             throws SQLException {
         String tableIdentifierWithQuoted = tableIdentifier(tablePath);
         Column addColumn = event.getColumn();
+        String afterColumn = event.getAfterColumn();
         String addColumnSQL =
                 buildAlterTableSql(
                         event.getSourceDialectName(),
@@ -536,7 +537,8 @@ public interface JdbcDialect extends Serializable {
                         AlterType.ADD.name(),
                         addColumn,
                         tableIdentifierWithQuoted,
-                        StringUtils.EMPTY);
+                        StringUtils.EMPTY,
+                        afterColumn);
         try (Statement statement = connection.createStatement()) {
             log.info("Executing add column SQL: " + addColumnSQL);
             statement.execute(addColumnSQL);
@@ -549,6 +551,7 @@ public interface JdbcDialect extends Serializable {
         String tableIdentifierWithQuoted = tableIdentifier(tablePath);
         Column changeColumn = event.getColumn();
         String oldColumnName = event.getOldColumn();
+        String afterColumn = event.getAfterColumn();
         String changeColumnSQL =
                 buildAlterTableSql(
                         event.getSourceDialectName(),
@@ -558,7 +561,8 @@ public interface JdbcDialect extends Serializable {
                                 : AlterType.CHANGE.name(),
                         changeColumn,
                         tableIdentifierWithQuoted,
-                        oldColumnName);
+                        oldColumnName,
+                        afterColumn);
 
         try (Statement statement = connection.createStatement()) {
             log.info("Executing change column SQL: " + changeColumnSQL);
@@ -571,6 +575,7 @@ public interface JdbcDialect extends Serializable {
             throws SQLException {
         String tableIdentifierWithQuoted = tableIdentifier(tablePath);
         Column modifyColumn = event.getColumn();
+        String afterColumn = event.getAfterColumn();
         String modifyColumnSQL =
                 buildAlterTableSql(
                         event.getSourceDialectName(),
@@ -578,7 +583,8 @@ public interface JdbcDialect extends Serializable {
                         AlterType.MODIFY.name(),
                         modifyColumn,
                         tableIdentifierWithQuoted,
-                        StringUtils.EMPTY);
+                        StringUtils.EMPTY,
+                        afterColumn);
 
         try (Statement statement = connection.createStatement()) {
             log.info("Executing modify column SQL: " + modifyColumnSQL);
@@ -598,7 +604,8 @@ public interface JdbcDialect extends Serializable {
                         AlterType.DROP.name(),
                         null,
                         tableIdentifierWithQuoted,
-                        dropColumn);
+                        dropColumn,
+                        null);
         try (Statement statement = connection.createStatement()) {
             log.info("Executing drop column SQL: " + dropColumnSQL);
             statement.execute(dropColumnSQL);
@@ -614,6 +621,7 @@ public interface JdbcDialect extends Serializable {
      * @param newColumn new column after ddl
      * @param tableName table name of sink table
      * @param oldColumnName old column name before ddl
+     * @param afterColumn column before the new column
      * @return alter table sql for sink table after schema change
      */
     default String buildAlterTableSql(
@@ -622,7 +630,8 @@ public interface JdbcDialect extends Serializable {
             String alterOperation,
             Column newColumn,
             String tableName,
-            String oldColumnName) {
+            String oldColumnName,
+            String afterColumn) {
         if (StringUtils.equals(alterOperation, AlterType.DROP.name())) {
             return String.format(
                     "ALTER TABLE %s drop column %s", tableName, 
quoteIdentifier(oldColumnName));
@@ -654,6 +663,7 @@ public interface JdbcDialect extends Serializable {
         }
         basicSql = decorateWithNullable(basicSql, typeBasicTypeDefine, 
sourceDialectName);
         basicSql = decorateWithComment(tableName, basicSql, 
typeBasicTypeDefine);
+        basicSql = decorateWithAfterColumn(basicSql, afterColumn);
         return dialectName().equals(DatabaseIdentifier.ORACLE) ? basicSql : 
basicSql + ";";
     }
 
@@ -779,6 +789,21 @@ public interface JdbcDialect extends Serializable {
         return sql.toString();
     }
 
+    /**
+     * decorate with after
+     *
+     * @param basicSql alter table sql for sink table
+     * @param afterColumn column before the new column
+     * @return alter table sql with after for sink table
+     */
+    default String decorateWithAfterColumn(String basicSql, String 
afterColumn) {
+        StringBuilder sql = new StringBuilder(basicSql);
+        if (StringUtils.isNotBlank(afterColumn)) {
+            sql.append("AFTER ").append(afterColumn).append(StringUtils.SPACE);
+        }
+        return sql.toString();
+    }
+
     /**
      * whether quotes with default value
      *
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
index 5cedb688d7..4d7a9ef2fd 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/AbstractJdbcSinkWriter.java
@@ -52,6 +52,7 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 @Slf4j
 public abstract class AbstractJdbcSinkWriter<ResourceT>
@@ -90,8 +91,27 @@ public abstract class AbstractJdbcSinkWriter<ResourceT>
         List<Column> columns = new ArrayList<>(tableSchema.getColumns());
         switch (event.getEventType()) {
             case SCHEMA_CHANGE_ADD_COLUMN:
-                Column addColumn = ((AlterTableAddColumnEvent) 
event).getColumn();
-                columns.add(addColumn);
+                AlterTableAddColumnEvent alterTableAddColumnEvent =
+                        (AlterTableAddColumnEvent) event;
+                Column addColumn = alterTableAddColumnEvent.getColumn();
+                String afterColumn = alterTableAddColumnEvent.getAfterColumn();
+                if (StringUtils.isNotBlank(afterColumn)) {
+                    Optional<Column> columnOptional =
+                            columns.stream()
+                                    .filter(column -> 
afterColumn.equals(column.getName()))
+                                    .findFirst();
+                    if (!columnOptional.isPresent()) {
+                        columns.add(addColumn);
+                        break;
+                    }
+                    columnOptional.ifPresent(
+                            column -> {
+                                int index = columns.indexOf(column);
+                                columns.add(index + 1, addColumn);
+                            });
+                } else {
+                    columns.add(addColumn);
+                }
                 break;
             case SCHEMA_CHANGE_DROP_COLUMN:
                 String dropColumn = ((AlterTableDropColumnEvent) 
event).getColumn();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
index 7ec4e23e62..d08c19c8ca 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/add_columns.sql
@@ -64,6 +64,19 @@ values (128,"scooter","Small 2-wheel 
scooter",3.14,'xx',1,1.1,'2023-02-02 09:09:
        (136,"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 
09:09:09');
 update products set name = 'dailai' where id = 135;
 
+alter table products ADD COLUMN add_column6 varchar(64) not null default 'ff' 
after id;
+delete from products where id = 115;
+insert into products
+values (173,'tt',"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1,'2023-02-02 
09:09:09'),
+       (174,'tt',"car battery","12V car battery",8.1,'xx',2,1.2,'2023-02-02 
09:09:09'),
+       (175,'tt',"12-pack drill bits","12-pack of drill bits with sizes 
ranging from #40 to #3",0.8,'xx',3,1.3,'2023-02-02 09:09:09'),
+       (176,'tt',"hammer","12oz carpenter's 
hammer",0.75,'xx',4,1.4,'2023-02-02 09:09:09'),
+       (177,'tt',"hammer","14oz carpenter's 
hammer",0.875,'xx',5,1.5,'2023-02-02 09:09:09'),
+       (178,'tt',"hammer","16oz carpenter's hammer",1.0,'xx',6,1.6,'2023-02-02 
09:09:09'),
+       (179,'tt',"rocks","box of assorted rocks",5.3,'xx',7,1.7,'2023-02-02 
09:09:09'),
+       (180,'tt',"jacket","water resistent black wind 
breaker",0.1,'xx',8,1.8,'2023-02-02 09:09:09'),
+       (181,'tt',"spare tire","24 inch spare tire",22.2,'xx',9,1.9,'2023-02-02 
09:09:09');
 
 -- add column for irrelevant table
 ALTER TABLE products_on_hand ADD COLUMN add_column5 varchar(64) not null 
default 'yy';
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
index d6502bbfdf..5c3b7d1f54 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/drop_columns.sql
@@ -21,7 +21,7 @@
 CREATE DATABASE IF NOT EXISTS `shop`;
 use shop;
 
-alter table products drop column add_column4;
+alter table products drop column add_column4,drop column add_column6;
 insert into products
 values (137,"scooter","Small 2-wheel scooter",3.14,'xx',1,1.1),
        (138,"car battery","12V car battery",8.1,'xx',2,1.2),

Reply via email to