This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 30aa485b38 [Feature][Jdbc] Support sink ddl for sqlserver #8114 (#8936)
30aa485b38 is described below
commit 30aa485b38b454662fcef3e008912be2696df337
Author: Aiden <[email protected]>
AuthorDate: Sat Mar 15 19:45:48 2025 +0800
[Feature][Jdbc] Support sink ddl for sqlserver #8114 (#8936)
---
docs/en/concept/schema-evolution.md | 41 +++
docs/zh/concept/schema-evolution.md | 41 +++
.../sqlserver/SqlServerCreateTableSqlBuilder.java | 12 +-
.../dialect/sqlserver/SqlServerDialect.java | 282 +++++++++++++++++++++
.../connector-jdbc-e2e-ddl/pom.xml | 6 +
.../connectors/jdbc/SqlServerSchemaChangeIT.java | 160 ++++++++++++
.../mysqlcdc_to_sqlserver_with_schema_change.conf | 56 ++++
..._sqlserver_with_schema_change_exactly_once.conf | 55 ++++
8 files changed, 648 insertions(+), 5 deletions(-)
diff --git a/docs/en/concept/schema-evolution.md
b/docs/en/concept/schema-evolution.md
index 4e29c50d67..3dbdfb5867 100644
--- a/docs/en/concept/schema-evolution.md
+++ b/docs/en/concept/schema-evolution.md
@@ -23,6 +23,7 @@ Schema Evolution means that the schema of a data table can be
changed and the da
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
+[Jdbc-SqlServer](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Doris.md)
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Paimon.md#Schema-Evolution)
@@ -332,4 +333,44 @@ sink {
multi_table_sink_replica = 2
}
}
+```
+
+### Mysql-CDC -> Jdbc-SqlServer
+```hocon
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:sqlserver://e2e_sqlserver:1433"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ user = "sa"
+ password = "paanssy1234$"
+ generate_sink_sql = true
+ database = master
+ table = "dbo.sink_table_with_schema_change"
+ primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
+ }
+}
```
\ No newline at end of file
diff --git a/docs/zh/concept/schema-evolution.md
b/docs/zh/concept/schema-evolution.md
index 0c793bd6bf..f76573a83b 100644
--- a/docs/zh/concept/schema-evolution.md
+++ b/docs/zh/concept/schema-evolution.md
@@ -23,6 +23,7 @@
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
+[Jdbc-SqlServer](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/StarRocks.md)
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md)
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Paimon.md#模式演变)
@@ -333,4 +334,44 @@ sink {
multi_table_sink_replica = 2
}
}
+```
+
+### Mysql-CDC -> Jdbc-SqlServer
+```hocon
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:sqlserver://e2e_sqlserver:1433"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ user = "sa"
+ password = "paanssy1234$"
+ generate_sink_sql = true
+ database = master
+ table = "dbo.sink_table_with_schema_change"
+ primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
+ }
+}
```
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
index df258e364b..53ae23bb83 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java
@@ -200,12 +200,14 @@ public class SqlServerCreateTableSqlBuilder {
} else {
columnSqls.add(SqlServerTypeConverter.INSTANCE.reconvert(column).getColumnType());
}
+
// nullable
- if (column.isNullable()) {
- columnSqls.add("NULL");
- } else {
- columnSqls.add("NOT NULL");
- }
+ boolean isPrimaryKeyColumn =
+ createIndex
+ && primaryKey != null
+ &&
primaryKey.getColumnNames().contains(column.getName());
+ String nullability = (column.isNullable() && !isPrimaryKeyColumn) ?
"NULL" : "NOT NULL";
+ columnSqls.add(nullability);
// comment
if (column.getComment() != null) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index aa24681224..3a09f987a9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -17,7 +17,14 @@
package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableDropColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
@@ -35,11 +42,22 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_CHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_NCHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_NTEXT;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_NVARCHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_SQLVARIANT;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_TEXT;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_UNIQUEIDENTIFIER;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_VARCHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerTypeConverter.SQLSERVER_XML;
+
@Slf4j
public class SqlServerDialect implements JdbcDialect {
@@ -259,4 +277,268 @@ public class SqlServerDialect implements JdbcDialect {
}
}
}
+
+ @Override
+ public TypeConverter<BasicTypeDefine> getTypeConverter() {
+ return SqlServerTypeConverter.INSTANCE;
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableAddColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ Column column = event.getColumn();
+ String sourceDialectName = event.getSourceDialectName();
+ boolean sameCatalog = StringUtils.equals(dialectName(),
sourceDialectName);
+ BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+ String columnType = sameCatalog ? column.getSourceType() :
typeDefine.getColumnType();
+
+ // Build the SQL statement that add the column
+ StringBuilder sqlBuilder =
+ buildAlterTablePrefix(tablePath)
+ .append(" ADD ")
+ .append(quoteIdentifier(column.getName()))
+ .append(" ")
+ .append(columnType)
+ .append(" ");
+
+ if (column.getDefaultValue() != null) {
+ // Handle default values
+ String defaultValueClause = sqlClauseWithDefaultValue(typeDefine,
sourceDialectName);
+ sqlBuilder.append(defaultValueClause);
+ }
+
+ if (!column.isNullable()) {
+ // Handle null constraints
+ sqlBuilder.append(" NOT NULL");
+ }
+
+ ddlSQL.add(sqlBuilder.toString());
+ // Process column comment
+ if (column.getComment() != null) {
+ ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+ }
+
+ // Execute the DDL statement
+ executeDDL(connection, ddlSQL);
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableChangeColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ if (event.getOldColumn() != null
+ &&
!(event.getColumn().getName().equals(event.getOldColumn()))) {
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("EXEC sp_rename ")
+ .append(
+ String.format(
+ "'%s.%s.%s.%s', ",
+ tablePath.getDatabaseName(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName(),
+ event.getOldColumn()))
+ .append(String.format("'%s', 'COLUMN';",
event.getColumn().getName()));
+ ddlSQL.add(sqlBuilder.toString());
+ }
+
+ executeDDL(connection, ddlSQL);
+
+ if (event.getColumn().getDataType() != null) {
+ applySchemaChange(
+ connection,
+ tablePath,
+
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
+ }
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableModifyColumnEvent event)
+ throws SQLException {
+ Column column = event.getColumn();
+ String sourceDialectName = event.getSourceDialectName();
+ boolean sameCatalog = StringUtils.equals(dialectName(),
sourceDialectName);
+ BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+ String columnType = sameCatalog ? column.getSourceType() :
typeDefine.getColumnType();
+ List<String> ddlSQL = new ArrayList<>();
+ // Handle field default constraints.
+ if (column.getDefaultValue() != null) {
+ if (sameCatalog
+ || !isSpecialDefaultValue(typeDefine.getDefaultValue(),
sourceDialectName)) {
+ String constraintQuery =
+ String.format(
+ "SELECT dc.name AS constraint_name\n"
+ + "FROM sys.default_constraints dc \n"
+ + "JOIN sys.columns c ON
dc.parent_object_id = c.object_id AND dc.parent_column_id = c.column_id \n"
+ + "JOIN sys.tables t ON c.object_id =
t.object_id \n"
+ + "JOIN sys.schemas s ON t.schema_id =
s.schema_id \n"
+ + "WHERE t.name = '%s' AND s.name =
'%s' AND c.name = '%s';",
+ tablePath.getTableName(),
+ tablePath.getSchemaName(),
+ event.getColumn().getName());
+
+ try (Statement stmt = connection.createStatement();
+ ResultSet rs = stmt.executeQuery(constraintQuery)) {
+ while (rs.next()) {
+ String constraintName = rs.getString(1);
+ if (StringUtils.isBlank(constraintName)) {
+ continue;
+ }
+ StringBuilder dropConstraintSQL =
+ buildAlterTablePrefix(tablePath)
+ .append(" DROP CONSTRAINT ")
+
.append(quoteIdentifier(constraintName));
+ ddlSQL.add(dropConstraintSQL.toString());
+ }
+ }
+
+ // Process column default
+ String defaultValueClause =
+ sqlClauseWithDefaultValue(typeDefine,
sourceDialectName);
+ if (StringUtils.isNotBlank(defaultValueClause)) {
+ StringBuilder defaultSqlBuilder =
+ buildAlterTablePrefix(tablePath)
+ .append(" ADD ")
+ .append(defaultValueClause)
+ .append(" FOR ")
+ .append(quoteIdentifier(column.getName()));
+ ddlSQL.add(defaultSqlBuilder.toString());
+ }
+ } else {
+ log.warn(
+ "Skipping unsupported default value for column {} in
table {}.",
+ column.getName(),
+ tablePath.getFullName());
+ }
+ }
+
+ // Process column comment
+ if (column.getComment() != null) {
+ ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+ }
+
+ // Build the SQL statement that modifies the column
+ StringBuilder sqlBuilder =
+ buildAlterTablePrefix(tablePath)
+ .append(" ALTER COLUMN ")
+ .append(quoteIdentifier(column.getName()))
+ .append(" ")
+ .append(columnType);
+ boolean targetColumnNullable = columnIsNullable(connection, tablePath,
column.getName());
+ if (column.isNullable() != targetColumnNullable &&
!targetColumnNullable) {
+ sqlBuilder.append(" NULL ");
+ }
+ ddlSQL.add(sqlBuilder.toString());
+
+ // Execute the DDL statement
+ executeDDL(connection, ddlSQL);
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableDropColumnEvent event)
+ throws SQLException {
+ // Handle field`s constraints.
+ String constraintQuery =
+ String.format(
+ "SELECT dc.name AS constraint_name\n"
+ + "FROM sys.default_constraints dc \n"
+ + "JOIN sys.columns c ON dc.parent_object_id =
c.object_id AND dc.parent_column_id = c.column_id \n"
+ + "JOIN sys.tables t ON c.object_id =
t.object_id \n"
+ + "JOIN sys.schemas s ON t.schema_id =
s.schema_id \n"
+ + "WHERE t.name = '%s' AND c.name = '%s' and
s.name = '%s';",
+ tablePath.getTableName(), event.getColumn(),
tablePath.getSchemaName());
+
+ try (Statement stmt = connection.createStatement();
+ ResultSet rs = stmt.executeQuery(constraintQuery)) {
+ while (rs.next()) {
+ String constraintName = rs.getString(1);
+ String dropConstraintSQL =
+ String.format(
+ "ALTER TABLE %s DROP CONSTRAINT %s",
+ tableIdentifier(tablePath),
quoteIdentifier(constraintName));
+ try (Statement dropStmt = connection.createStatement()) {
+ log.info("Executing drop constraint SQL: {}",
dropConstraintSQL);
+ dropStmt.execute(dropConstraintSQL);
+ }
+ }
+ }
+
+ String dropColumnSQL =
+ String.format(
+ "ALTER TABLE %s DROP COLUMN %s",
+ tableIdentifier(tablePath),
quoteIdentifier(event.getColumn()));
+ try (Statement statement = connection.createStatement()) {
+ log.info("Executing drop column SQL: {}", dropColumnSQL);
+ statement.execute(dropColumnSQL);
+ }
+ }
+
+ @Override
+ public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
+ String sqlServerType = columnDefine.getDataType();
+ switch (sqlServerType) {
+ case SQLSERVER_CHAR:
+ case SQLSERVER_VARCHAR:
+ case SQLSERVER_NCHAR:
+ case SQLSERVER_NVARCHAR:
+ case SQLSERVER_TEXT:
+ case SQLSERVER_NTEXT:
+ case SQLSERVER_XML:
+ case SQLSERVER_UNIQUEIDENTIFIER:
+ case SQLSERVER_SQLVARIANT:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private void executeDDL(Connection connection, List<String> ddlSQL) throws
SQLException {
+ try (Statement statement = connection.createStatement()) {
+ for (String sql : ddlSQL) {
+ log.info("Executing SqlServer SQL: {}", sql);
+ statement.execute(sql);
+ }
+ } catch (SQLException e) {
+ throw new SQLException("Error executing SqlServer SQL: " + ddlSQL,
e.getSQLState(), e);
+ }
+ }
+
+ private String buildColumnCommentSQL(TablePath tablePath, Column column) {
+ return String.format(
+ "EXEC %s.sys.sp_updateextendedproperty 'MS_Description',
N'%s', 'schema', N'%s', "
+ + "'table', N'%s', 'column', N'%s';",
+ tablePath.getDatabaseName(),
+ column.getComment(),
+ tablePath.getSchemaName(),
+ tablePath.getTableName(),
+ column.getName());
+ }
+
+ private boolean columnIsNullable(Connection connection, TablePath
tablePath, String column)
+ throws SQLException {
+ String selectColumnSQL =
+ String.format(
+ "SELECT IS_NULLABLE FROM information_schema.COLUMNS
WHERE %s AND COLUMN_NAME = '%s';",
+ buildCommonWhereClause(tablePath), column);
+ try (Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery(selectColumnSQL);
+ rs.next();
+ return rs.getString("IS_NULLABLE").equals("YES");
+ }
+ }
+
+ private StringBuilder buildAlterTablePrefix(TablePath tablePath) {
+ return new StringBuilder("ALTER TABLE
").append(tableIdentifier(tablePath));
+ }
+
+ private String buildCommonWhereClause(TablePath tablePath) {
+ return String.format(
+ "TABLE_CATALOG = '%s' AND TABLE_SCHEMA = '%s' AND TABLE_NAME =
'%s'",
+ tablePath.getDatabaseName(), tablePath.getSchemaName(),
tablePath.getTableName());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
index 711953ece0..5a3eb97b07 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
@@ -87,6 +87,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.microsoft.sqlserver</groupId>
+ <artifactId>mssql-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- jdbc sink container image-->
<dependency>
<groupId>org.testcontainers</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SqlServerSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SqlServerSchemaChangeIT.java
new file mode 100644
index 0000000000..5fdc075f95
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SqlServerSchemaChangeIT.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Duration;
+
+@Slf4j
+public class SqlServerSchemaChangeIT extends AbstractSchemaChangeBaseIT {
+
+ private static final String DATABASE_TYPE = "SqlServer";
+ private static final String SQLSERVER_IMAGE =
"mcr.microsoft.com/mssql/server:2022-latest";
+ private static final String SQLSERVER_CONTAINER_HOST = "sqlserver";
+ private static final String SQLSERVER_DATABASE = "master";
+ private static final String SQLSERVER_SCHEMA = "dbo";
+ private static final String SQLSERVER_USER = "sa";
+ private static final String ACCEPT_EULA = "ACCEPT_EULA";
+ private static final String Y = "Y";
+ private static final String SA_PASSWORD = "SA_PASSWORD";
+ private static final String SQLSERVER_PASSWORD = "paanssy1234$";
+ private static final int SQLSERVER_PORT = 1433;
+ private static final int SQLSERVER_XA_PORT = 5022;
+ private final String SQLSERVER_JDBC_URL =
+ "jdbc:sqlserver://%s:%s;databaseName=%s;"
+ +
"useBulkCopyForBatchInsert=true;delayLoadingLobs=true;useFmtOnly=false;"
+ + "integratedSecurity=false;xaTransactionCompatible=true;"
+ + "encrypt=false;trustServerCertificate=true;";
+ private static final String DRIVER_CLASS =
"com.microsoft.sqlserver.jdbc.SQLServerDriver";
+ private static final String SQLSERVER_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.2.1.jre8/mssql-jdbc-9.2.1.jre8.jar";
+ private final String schemaEvolutionCaseConfig =
+ "/mysqlcdc_to_sqlserver_with_schema_change.conf";
+ private final String schemaEvolutionCaseExactlyOnceConfig =
+ "/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf";
+ private final String QUERY_COLUMNS =
+ "SELECT REPLACE(REPLACE(COLUMN_NAME, '[', ''), ']', '')
COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '%s' AND
TABLE_NAME = '%s' ORDER BY COLUMN_NAME";
+
+ @Override
+ protected SchemaChangeCase getSchemaChangeCase() {
+ return SchemaChangeCase.builder()
+ .jdbcUrl(SQLSERVER_JDBC_URL)
+ .username(SQLSERVER_USER)
+ .password(SQLSERVER_PASSWORD)
+ .driverUrl(SQLSERVER_DRIVER_JAR)
+ .port(SQLSERVER_PORT)
+ .driverClassName(DRIVER_CLASS)
+ .databaseName(SQLSERVER_DATABASE)
+ .schemaName(SQLSERVER_SCHEMA)
+ .schemaEvolutionCase(schemaEvolutionCaseConfig)
+
.schemaEvolutionCaseExactlyOnce(schemaEvolutionCaseExactlyOnceConfig)
+ .sinkTable1(SINK_TABLE1)
+ .sinkTable2(SINK_TABLE2)
+ .sinkQueryColumns(QUERY_COLUMNS)
+ .openExactlyOnce(true)
+ .build();
+ }
+
+ @Override
+ protected GenericContainer initSinkContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(SQLSERVER_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(SQLSERVER_CONTAINER_HOST)
+ .withEnv(ACCEPT_EULA, Y)
+ .withEnv(SA_PASSWORD, SQLSERVER_PASSWORD)
+ .withEnv("MSSQL_ENABLE_HADR", "1")
+ .withEnv("MSSQL_AGENT_ENABLED", "1")
+ .withExposedPorts(SQLSERVER_PORT, SQLSERVER_XA_PORT)
+ .waitingFor(
+ Wait.forLogMessage(
+ ".*SQL Server is now ready for client
connections.*\\n", 1))
+ .withStartupTimeout(Duration.ofMinutes(10))
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(SQLSERVER_IMAGE)));
+
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format("%d:%d", SQLSERVER_PORT, SQLSERVER_PORT),
+ String.format("%d:%d", SQLSERVER_XA_PORT,
SQLSERVER_XA_PORT)));
+
+ container.start();
+ try {
+ // This set of commands prepares for the subsequent enabling of
the external user
+ // enabled configuration (for XA transaction support)
+ container.execInContainer(
+ "/opt/mssql-tools18/bin/sqlcmd",
+ "-S",
+ "localhost",
+ "-U",
+ SQLSERVER_USER,
+ "-P",
+ SQLSERVER_PASSWORD,
+ "-Q",
+ "EXEC sp_configure 'show advanced options', 1;
RECONFIGURE;",
+ "-C");
+
+ // Enable external user access permissions, which is a requirement
for SQL Server to
+ // support XA distributed transactions.
+ container.execInContainer(
+ "/opt/mssql-tools18/bin/sqlcmd",
+ "-S",
+ "localhost",
+ "-U",
+ SQLSERVER_USER,
+ "-P",
+ SQLSERVER_PASSWORD,
+ "-Q",
+ "EXEC sp_configure 'external user enabled', 1;
RECONFIGURE;",
+ "-C");
+
+ log.info("Installing stored procedures sp_sqljdbc_xa_install.");
+ container.execInContainer(
+ "/opt/mssql-tools18/bin/sqlcmd",
+ "-S",
+ "localhost",
+ "-U",
+ SQLSERVER_USER,
+ "-P",
+ SQLSERVER_PASSWORD,
+ "-Q",
+ "IF NOT EXISTS (SELECT * FROM sys.objects WHERE name =
'xp_sqljdbc_xa_init_ex') "
+ + "EXEC sp_sqljdbc_xa_install",
+ "-C");
+ } catch (IOException | InterruptedException e) {
+ log.error("XA procedure installation failed: ", e);
+ throw new RuntimeException(e);
+ }
+ return container;
+ }
+
+ @Override
+ protected String sinkDatabaseType() {
+ return DATABASE_TYPE;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change.conf
new file mode 100644
index 0000000000..789151028d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:sqlserver://sqlserver:1433"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ user = "sa"
+ password = "paanssy1234$"
+ generate_sink_sql = true
+ database = master
+ table = "dbo.sink_table_with_schema_change"
+ primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf
new file mode 100644
index 0000000000..2053cfacac
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_sqlserver_with_schema_change_exactly_once.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:sqlserver://sqlserver:1433"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ user = "sa"
+ password = "paanssy1234$"
+ generate_sink_sql = true
+ database = master
+ table = "dbo.sink_table_with_schema_change_exactly_once"
+ primary_keys = ["id"]
+ is_exactly_once = true
+ xa_data_source_class_name =
"com.microsoft.sqlserver.jdbc.SQLServerXADataSource"
+ }
+}