This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 30aa485b38 [Feature][Jdbc] Support sink ddl for sqlserver #8114 (#8936)
30aa485b38 is described below

commit 30aa485b38b454662fcef3e008912be2696df337
Author: Aiden <[email protected]>
AuthorDate: Sat Mar 15 19:45:48 2025 +0800

    [Feature][Jdbc] Support sink ddl for sqlserver #8114 (#8936)
---
 docs/en/concept/schema-evolution.md                |  41 +++
 docs/zh/concept/schema-evolution.md                |  41 +++
 .../sqlserver/SqlServerCreateTableSqlBuilder.java  |  12 +-
 .../dialect/sqlserver/SqlServerDialect.java        | 282 +++++++++++++++++++++
 .../connector-jdbc-e2e-ddl/pom.xml                 |   6 +
 .../connectors/jdbc/SqlServerSchemaChangeIT.java   | 160 ++++++++++++
 .../mysqlcdc_to_sqlserver_with_schema_change.conf  |  56 ++++
 ..._sqlserver_with_schema_change_exactly_once.conf |  55 ++++
 8 files changed, 648 insertions(+), 5 deletions(-)

diff --git a/docs/en/concept/schema-evolution.md 
b/docs/en/concept/schema-evolution.md
index 4e29c50d67..3dbdfb5867 100644
--- a/docs/en/concept/schema-evolution.md
+++ b/docs/en/concept/schema-evolution.md
@@ -23,6 +23,7 @@ Schema Evolution means that the schema of a data table can be 
changed and the da
 
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
+[Jdbc-SqlServer](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
 
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Doris.md)
 
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Paimon.md#Schema-Evolution)
@@ -332,4 +333,44 @@ sink {
     multi_table_sink_replica = 2
   }
 }
+```
+
+### Mysql-CDC -> Jdbc-SqlServer
+```hocon
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:sqlserver://e2e_sqlserver:1433"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    user = "sa"
+    password = "paanssy1234$"
+    generate_sink_sql = true
+    database = master
+    table = "dbo.sink_table_with_schema_change"
+    primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
+  }
+}
 ```
\ No newline at end of file
diff --git a/docs/zh/concept/schema-evolution.md 
b/docs/zh/concept/schema-evolution.md
index 0c793bd6bf..f76573a83b 100644
--- a/docs/zh/concept/schema-evolution.md
+++ b/docs/zh/concept/schema-evolution.md
@@ -23,6 +23,7 @@
 
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
 
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
 
[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
+[Jdbc-SqlServer](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/StarRocks.md)
 
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md)
 
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Paimon.md#模式演变)
@@ -333,4 +334,44 @@ sink {
     multi_table_sink_replica = 2
   }
 }
+```
+
+### Mysql-CDC -> Jdbc-SqlServer
+```hocon
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:sqlserver://e2e_sqlserver:1433"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    user = "sa"
+    password = "paanssy1234$"
+    generate_sink_sql = true
+    database = master
+    table = "dbo.sink_table_with_schema_change"
+    primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
+  }
+}
 ```
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
index df258e364b..53ae23bb83 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
@@ -200,12 +200,14 @@ public class SqlServerCreateTableSqlBuilder {
         } else {
             
columnSqls.add(SqlServerTypeConverter.INSTANCE.reconvert(column).getColumnType());
         }
+
         // nullable
-        if (column.isNullable()) {
-            columnSqls.add("NULL");
-        } else {
-            columnSqls.add("NOT NULL");
-        }
+        boolean isPrimaryKeyColumn =
+                createIndex
+                        && primaryKey != null
+                        && 
primaryKey.getColumnNames().contains(column.getName());
+        String nullability = (column.isNullable() && !isPrimaryKeyColumn) ? 
"NULL" : "NOT NULL";
+        columnSqls.add(nullability);
 
         // comment
         if (column.getComment() != null) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index aa24681224..3a09f987a9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,7 +17,14 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -35,11 +42,22 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_CHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_NCHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_NTEXT;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_NVARCHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_SQLVARIANT;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_TEXT;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_UNIQUEIDENTIFIER;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_VARCHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_XML;
+
 @Slf4j
 public class SqlServerDialect implements JdbcDialect {
 
@@ -259,4 +277,268 @@ public class SqlServerDialect implements JdbcDialect {
             }
         }
     }
+
+    @Override
+    public TypeConverter<BasicTypeDefine> getTypeConverter() {
+        return SqlServerTypeConverter.INSTANCE;
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableAddColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        Column column = event.getColumn();
+        String sourceDialectName = event.getSourceDialectName();
+        boolean sameCatalog = StringUtils.equals(dialectName(), 
sourceDialectName);
+        BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+        String columnType = sameCatalog ? column.getSourceType() : 
typeDefine.getColumnType();
+
+        // Build the SQL statement that add the column
+        StringBuilder sqlBuilder =
+                buildAlterTablePrefix(tablePath)
+                        .append(" ADD ")
+                        .append(quoteIdentifier(column.getName()))
+                        .append(" ")
+                        .append(columnType)
+                        .append(" ");
+
+        if (column.getDefaultValue() != null) {
+            // Handle default values
+            String defaultValueClause = sqlClauseWithDefaultValue(typeDefine, 
sourceDialectName);
+            sqlBuilder.append(defaultValueClause);
+        }
+
+        if (!column.isNullable()) {
+            // Handle null constraints
+            sqlBuilder.append(" NOT NULL");
+        }
+
+        ddlSQL.add(sqlBuilder.toString());
+        // Process column comment
+        if (column.getComment() != null) {
+            ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+        }
+
+        // Execute the DDL statement
+        executeDDL(connection, ddlSQL);
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableChangeColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        if (event.getOldColumn() != null
+                && 
!(event.getColumn().getName().equals(event.getOldColumn()))) {
+            StringBuilder sqlBuilder =
+                    new StringBuilder()
+                            .append("EXEC sp_rename ")
+                            .append(
+                                    String.format(
+                                            "'%s.%s.%s.%s', ",
+                                            tablePath.getDatabaseName(),
+                                            tablePath.getSchemaName(),
+                                            tablePath.getTableName(),
+                                            event.getOldColumn()))
+                            .append(String.format("'%s', 'COLUMN';", 
event.getColumn().getName()));
+            ddlSQL.add(sqlBuilder.toString());
+        }
+
+        executeDDL(connection, ddlSQL);
+
+        if (event.getColumn().getDataType() != null) {
+            applySchemaChange(
+                    connection,
+                    tablePath,
+                    
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
+        }
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableModifyColumnEvent event)
+            throws SQLException {
+        Column column = event.getColumn();
+        String sourceDialectName = event.getSourceDialectName();
+        boolean sameCatalog = StringUtils.equals(dialectName(), 
sourceDialectName);
+        BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+        String columnType = sameCatalog ? column.getSourceType() : 
typeDefine.getColumnType();
+        List<String> ddlSQL = new ArrayList<>();
+        // Handle field default constraints.
+        if (column.getDefaultValue() != null) {
+            if (sameCatalog
+                    || !isSpecialDefaultValue(typeDefine.getDefaultValue(), 
sourceDialectName)) {
+                String constraintQuery =
+                        String.format(
+                                "SELECT dc.name AS constraint_name\n"
+                                        + "FROM sys.default_constraints dc \n"
+                                        + "JOIN sys.columns c ON 
dc.parent_object_id = c.object_id AND dc.parent_column_id = c.column_id \n"
+                                        + "JOIN sys.tables t ON c.object_id = 
t.object_id \n"
+                                        + "JOIN sys.schemas s ON t.schema_id = 
s.schema_id \n"
+                                        + "WHERE t.name = '%s' AND s.name = 
'%s' AND c.name = '%s';",
+                                tablePath.getTableName(),
+                                tablePath.getSchemaName(),
+                                event.getColumn().getName());
+
+                try (Statement stmt = connection.createStatement();
+                        ResultSet rs = stmt.executeQuery(constraintQuery)) {
+                    while (rs.next()) {
+                        String constraintName = rs.getString(1);
+                        if (StringUtils.isBlank(constraintName)) {
+                            continue;
+                        }
+                        StringBuilder dropConstraintSQL =
+                                buildAlterTablePrefix(tablePath)
+                                        .append(" DROP CONSTRAINT ")
+                                        
.append(quoteIdentifier(constraintName));
+                        ddlSQL.add(dropConstraintSQL.toString());
+                    }
+                }
+
+                // Process column default
+                String defaultValueClause =
+                        sqlClauseWithDefaultValue(typeDefine, 
sourceDialectName);
+                if (StringUtils.isNotBlank(defaultValueClause)) {
+                    StringBuilder defaultSqlBuilder =
+                            buildAlterTablePrefix(tablePath)
+                                    .append(" ADD ")
+                                    .append(defaultValueClause)
+                                    .append(" FOR ")
+                                    .append(quoteIdentifier(column.getName()));
+                    ddlSQL.add(defaultSqlBuilder.toString());
+                }
+            } else {
+                log.warn(
+                        "Skipping unsupported default value for column {} in 
table {}.",
+                        column.getName(),
+                        tablePath.getFullName());
+            }
+        }
+
+        // Process column comment
+        if (column.getComment() != null) {
+            ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+        }
+
+        // Build the SQL statement that modifies the column
+        StringBuilder sqlBuilder =
+                buildAlterTablePrefix(tablePath)
+                        .append(" ALTER COLUMN ")
+                        .append(quoteIdentifier(column.getName()))
+                        .append(" ")
+                        .append(columnType);
+        boolean targetColumnNullable = columnIsNullable(connection, tablePath, 
column.getName());
+        if (column.isNullable() != targetColumnNullable && 
!targetColumnNullable) {
+            sqlBuilder.append(" NULL ");
+        }
+        ddlSQL.add(sqlBuilder.toString());
+
+        // Execute the DDL statement
+        executeDDL(connection, ddlSQL);
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableDropColumnEvent event)
+            throws SQLException {
+        // Handle field`s constraints.
+        String constraintQuery =
+                String.format(
+                        "SELECT dc.name AS constraint_name\n"
+                                + "FROM sys.default_constraints dc \n"
+                                + "JOIN sys.columns c ON dc.parent_object_id = 
c.object_id AND dc.parent_column_id = c.column_id \n"
+                                + "JOIN sys.tables t ON c.object_id = 
t.object_id \n"
+                                + "JOIN sys.schemas s ON t.schema_id = 
s.schema_id \n"
+                                + "WHERE t.name = '%s' AND c.name = '%s' and 
s.name = '%s';",
+                        tablePath.getTableName(), event.getColumn(), 
tablePath.getSchemaName());
+
+        try (Statement stmt = connection.createStatement();
+                ResultSet rs = stmt.executeQuery(constraintQuery)) {
+            while (rs.next()) {
+                String constraintName = rs.getString(1);
+                String dropConstraintSQL =
+                        String.format(
+                                "ALTER TABLE %s DROP CONSTRAINT %s",
+                                tableIdentifier(tablePath), 
quoteIdentifier(constraintName));
+                try (Statement dropStmt = connection.createStatement()) {
+                    log.info("Executing drop constraint SQL: {}", 
dropConstraintSQL);
+                    dropStmt.execute(dropConstraintSQL);
+                }
+            }
+        }
+
+        String dropColumnSQL =
+                String.format(
+                        "ALTER TABLE %s DROP COLUMN %s",
+                        tableIdentifier(tablePath), 
quoteIdentifier(event.getColumn()));
+        try (Statement statement = connection.createStatement()) {
+            log.info("Executing drop column SQL: {}", dropColumnSQL);
+            statement.execute(dropColumnSQL);
+        }
+    }
+
+    @Override
+    public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
+        String sqlServerType = columnDefine.getDataType();
+        switch (sqlServerType) {
+            case SQLSERVER_CHAR:
+            case SQLSERVER_VARCHAR:
+            case SQLSERVER_NCHAR:
+            case SQLSERVER_NVARCHAR:
+            case SQLSERVER_TEXT:
+            case SQLSERVER_NTEXT:
+            case SQLSERVER_XML:
+            case SQLSERVER_UNIQUEIDENTIFIER:
+            case SQLSERVER_SQLVARIANT:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private void executeDDL(Connection connection, List<String> ddlSQL) throws 
SQLException {
+        try (Statement statement = connection.createStatement()) {
+            for (String sql : ddlSQL) {
+                log.info("Executing SqlServer SQL: {}", sql);
+                statement.execute(sql);
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Error executing SqlServer SQL: " + ddlSQL, 
e.getSQLState(), e);
+        }
+    }
+
+    private String buildColumnCommentSQL(TablePath tablePath, Column column) {
+        return String.format(
+                "EXEC %s.sys.sp_updateextendedproperty 'MS_Description', 
N'%s', 'schema', N'%s', "
+                        + "'table', N'%s', 'column', N'%s';",
+                tablePath.getDatabaseName(),
+                column.getComment(),
+                tablePath.getSchemaName(),
+                tablePath.getTableName(),
+                column.getName());
+    }
+
+    private boolean columnIsNullable(Connection connection, TablePath 
tablePath, String column)
+            throws SQLException {
+        String selectColumnSQL =
+                String.format(
+                        "SELECT IS_NULLABLE FROM information_schema.COLUMNS 
WHERE %s AND COLUMN_NAME = '%s';",
+                        buildCommonWhereClause(tablePath), column);
+        try (Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery(selectColumnSQL);
+            rs.next();
+            return rs.getString("IS_NULLABLE").equals("YES");
+        }
+    }
+
+    private StringBuilder buildAlterTablePrefix(TablePath tablePath) {
+        return new StringBuilder("ALTER TABLE 
").append(tableIdentifier(tablePath));
+    }
+
+    private String buildCommonWhereClause(TablePath tablePath) {
+        return String.format(
+                "TABLE_CATALOG = '%s' AND TABLE_SCHEMA = '%s' AND TABLE_NAME = 
'%s'",
+                tablePath.getDatabaseName(), tablePath.getSchemaName(), 
tablePath.getTableName());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
index 711953ece0..5a3eb97b07 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
@@ -87,6 +87,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.microsoft.sqlserver</groupId>
+            <artifactId>mssql-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- jdbc sink container image-->
         <dependency>
             <groupId>org.testcontainers</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SqlServerSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SqlServerSchemaChangeIT.java
new file mode 100644
index 0000000000..5fdc075f95
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SqlServerSchemaChangeIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Duration;
+
+@Slf4j
+public class SqlServerSchemaChangeIT extends AbstractSchemaChangeBaseIT {
+
+    private static final String DATABASE_TYPE = "SqlServer";
+    private static final String SQLSERVER_IMAGE = 
"mcr.microsoft.com/mssql/server:2022-latest";
+    private static final String SQLSERVER_CONTAINER_HOST = "sqlserver";
+    private static final String SQLSERVER_DATABASE = "master";
+    private static final String SQLSERVER_SCHEMA = "dbo";
+    private static final String SQLSERVER_USER = "sa";
+    private static final String ACCEPT_EULA = "ACCEPT_EULA";
+    private static final String Y = "Y";
+    private static final String SA_PASSWORD = "SA_PASSWORD";
+    private static final String SQLSERVER_PASSWORD = "paanssy1234$";
+    private static final int SQLSERVER_PORT = 1433;
+    private static final int SQLSERVER_XA_PORT = 5022;
+    private final String SQLSERVER_JDBC_URL =
+            "jdbc:sqlserver://%s:%s;databaseName=%s;"
+                    + 
"useBulkCopyForBatchInsert=true;delayLoadingLobs=true;useFmtOnly=false;"
+                    + "integratedSecurity=false;xaTransactionCompatible=true;"
+                    + "encrypt=false;trustServerCertificate=true;";
+    private static final String DRIVER_CLASS = 
"com.microsoft.sqlserver.jdbc.SQLServerDriver";
+    private static final String SQLSERVER_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.2.1.jre8/mssql-jdbc-9.2.1.jre8.jar";;
+    private final String schemaEvolutionCaseConfig =
+            "/mysqlcdc_to_sqlserver_with_schema_change.conf";
+    private final String schemaEvolutionCaseExactlyOnceConfig =
+            "/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf";
+    private final String QUERY_COLUMNS =
+            "SELECT REPLACE(REPLACE(COLUMN_NAME, '[', ''), ']', '') 
COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND 
TABLE_NAME = '%s' ORDER BY COLUMN_NAME";
+
+    @Override
+    protected SchemaChangeCase getSchemaChangeCase() {
+        return SchemaChangeCase.builder()
+                .jdbcUrl(SQLSERVER_JDBC_URL)
+                .username(SQLSERVER_USER)
+                .password(SQLSERVER_PASSWORD)
+                .driverUrl(SQLSERVER_DRIVER_JAR)
+                .port(SQLSERVER_PORT)
+                .driverClassName(DRIVER_CLASS)
+                .databaseName(SQLSERVER_DATABASE)
+                .schemaName(SQLSERVER_SCHEMA)
+                .schemaEvolutionCase(schemaEvolutionCaseConfig)
+                
.schemaEvolutionCaseExactlyOnce(schemaEvolutionCaseExactlyOnceConfig)
+                .sinkTable1(SINK_TABLE1)
+                .sinkTable2(SINK_TABLE2)
+                .sinkQueryColumns(QUERY_COLUMNS)
+                .openExactlyOnce(true)
+                .build();
+    }
+
+    @Override
+    protected GenericContainer initSinkContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(SQLSERVER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(SQLSERVER_CONTAINER_HOST)
+                        .withEnv(ACCEPT_EULA, Y)
+                        .withEnv(SA_PASSWORD, SQLSERVER_PASSWORD)
+                        .withEnv("MSSQL_ENABLE_HADR", "1")
+                        .withEnv("MSSQL_AGENT_ENABLED", "1")
+                        .withExposedPorts(SQLSERVER_PORT, SQLSERVER_XA_PORT)
+                        .waitingFor(
+                                Wait.forLogMessage(
+                                        ".*SQL Server is now ready for client 
connections.*\\n", 1))
+                        .withStartupTimeout(Duration.ofMinutes(10))
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(SQLSERVER_IMAGE)));
+
+        container.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%d:%d", SQLSERVER_PORT, SQLSERVER_PORT),
+                        String.format("%d:%d", SQLSERVER_XA_PORT, 
SQLSERVER_XA_PORT)));
+
+        container.start();
+        try {
+            // This set of commands prepares for the subsequent enabling of 
the external user
+            // enabled configuration (for XA transaction support)
+            container.execInContainer(
+                    "/opt/mssql-tools18/bin/sqlcmd",
+                    "-S",
+                    "localhost",
+                    "-U",
+                    SQLSERVER_USER,
+                    "-P",
+                    SQLSERVER_PASSWORD,
+                    "-Q",
+                    "EXEC sp_configure 'show advanced options', 1; 
RECONFIGURE;",
+                    "-C");
+
+            // Enable external user access permissions, which is a requirement 
for SQL Server to
+            // support XA distributed transactions.
+            container.execInContainer(
+                    "/opt/mssql-tools18/bin/sqlcmd",
+                    "-S",
+                    "localhost",
+                    "-U",
+                    SQLSERVER_USER,
+                    "-P",
+                    SQLSERVER_PASSWORD,
+                    "-Q",
+                    "EXEC sp_configure 'external user enabled', 1; 
RECONFIGURE;",
+                    "-C");
+
+            log.info("Installing stored procedures sp_sqljdbc_xa_install.");
+            container.execInContainer(
+                    "/opt/mssql-tools18/bin/sqlcmd",
+                    "-S",
+                    "localhost",
+                    "-U",
+                    SQLSERVER_USER,
+                    "-P",
+                    SQLSERVER_PASSWORD,
+                    "-Q",
+                    "IF NOT EXISTS (SELECT * FROM sys.objects WHERE name = 
'xp_sqljdbc_xa_init_ex') "
+                            + "EXEC sp_sqljdbc_xa_install",
+                    "-C");
+        } catch (IOException | InterruptedException e) {
+            log.error("XA procedure installation failed: ", e);
+            throw new RuntimeException(e);
+        }
+        return container;
+    }
+
+    @Override
+    protected String sinkDatabaseType() {
+        return DATABASE_TYPE;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change.conf
new file mode 100644
index 0000000000..789151028d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:sqlserver://sqlserver:1433"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    user = "sa"
+    password = "paanssy1234$"
+    generate_sink_sql = true
+    database = master
+    table = "dbo.sink_table_with_schema_change"
+    primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf
new file mode 100644
index 0000000000..2053cfacac
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:sqlserver://sqlserver:1433"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    user = "sa"
+    password = "paanssy1234$"
+    generate_sink_sql = true
+    database = master
+    table = "dbo.sink_table_with_schema_change_exactly_once"
+    primary_keys = ["id"]
+    is_exactly_once = true
+    xa_data_source_class_name = 
"com.microsoft.sqlserver.jdbc.SQLServerXADataSource"
+  }
+}


Reply via email to