This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5ff3427428 [Feature][Jdbc] Support sink ddl for dameng (#8380)
5ff3427428 is described below

commit 5ff3427428bf40d041c420dc693d9734e2ed56c0
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Feb 13 14:08:59 2025 +0800

    [Feature][Jdbc] Support sink ddl for dameng (#8380)
---
 .github/workflows/backend.yml                      |  28 +++
 docs/en/concept/schema-evolution.md                |  49 ++++-
 docs/zh/concept/schema-evolution.md                |  49 ++++-
 .../schema/event/AlterTableModifyColumnEvent.java  |   5 +
 .../handler/AlterTableSchemaEventHandler.java      |  19 +-
 .../jdbc/internal/dialect/dm/DmdbDialect.java      | 229 +++++++++++++++++++++
 .../connector-jdbc-e2e-ddl/pom.xml                 |  22 +-
 .../jdbc/AbstractSchemaChangeBaseIT.java           |  39 +++-
 .../connectors/jdbc/DmSchemaChangeIT.java          |  84 ++++++++
 .../connectors/jdbc/PostgresSchemaChangeIT.java    |   6 +-
 .../connectors/jdbc/SchemaChangeCase.java          |   2 -
 .../src/test/resources/ddl/modify_columns.sql      |   2 +-
 .../src/test/resources/ddl/shop.sql                |   6 +-
 .../mysqlcdc_to_dm_with_schema_change.conf         |  57 +++++
 ...lcdc_to_dm_with_schema_change_exactly_once.conf |  55 +++++
 15 files changed, 620 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 9734e7000d..4a25bf2caf 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1139,6 +1139,34 @@ jobs:
         env:
           MAVEN_OPTS: -Xmx4096m
 
+  jdbc-connectors-it-ddl:
+    needs: [ changes, sanity-check ]
+    if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 
'true'
+    runs-on: ${{ matrix.os }}
+    env:
+      RUN_ALL_CONTAINER: ${{ needs.changes.outputs.api }}
+      RUN_ZETA_CONTAINER: ${{ needs.changes.outputs.engine }}
+    strategy:
+      matrix:
+        java: [ '8', '11' ]
+        os: [ 'ubuntu-latest' ]
+    timeout-minutes: 120
+    steps:
+      - uses: actions/checkout@v2
+      - name: Set up JDK ${{ matrix.java }}
+        uses: actions/setup-java@v3
+        with:
+          java-version: ${{ matrix.java }}
+          distribution: 'temurin'
+          cache: 'maven'
+      - name: free disk space
+        run: tools/github/free_disk_space.sh
+      - name: run jdbc connectors integration test (sink ddl)
+        run: |
+          ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false 
-D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl 
:connector-jdbc-e2e-ddl -am -Pci
+        env:
+          MAVEN_OPTS: -Xmx4096m
+
   kudu-connector-it:
     needs: [ changes, sanity-check ]
     if: needs.changes.outputs.api == 'true' || 
contains(needs.changes.outputs.it-modules, 'connector-kudu-e2e')
diff --git a/docs/en/concept/schema-evolution.md 
b/docs/en/concept/schema-evolution.md
index 88970a0b42..4e29c50d67 100644
--- a/docs/en/concept/schema-evolution.md
+++ b/docs/en/concept/schema-evolution.md
@@ -22,16 +22,20 @@ Schema Evolution means that the schema of a data table can 
be changed and the da
 
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
+[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
 
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
 
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Doris.md)
 
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Paimon.md#Schema-Evolution)
 
[Elasticsearch](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Elasticsearch.md#Schema-Evolution)
 
-Note: The schema evolution is not support the transform at now. The schema 
evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently 
not supported the default value of the column in ddl.
+Note:  
+* The schema evolution is not support the transform at now. The schema 
evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently 
not supported the default value of the column in ddl.
 
-When you use the Oracle-CDC,you can not use the username named `SYS` or 
`SYSTEM` to modify the table schema, otherwise the ddl event will be filtered 
out which can lead to the schema evolution not working.
+* When you use the Oracle-CDC,you can not use the username named `SYS` or 
`SYSTEM` to modify the table schema, otherwise the ddl event will be filtered 
out which can lead to the schema evolution not working.
 Otherwise, If your table name start with `ORA_TEMP_` will also has the same 
problem.
 
+* Earlier versions of `Dameng` databases do not support the change of 
`Varchar` type fields to `Text` type fields.
+
 ## Enable schema evolution
 Schema evolution is disabled by default in CDC source. You need configure 
`schema-changes.enabled = true` which is only supported in CDC to enable it.
 
@@ -287,4 +291,45 @@ sink {
     multi_table_sink_replica = 2
   }
 }
+```
+
+### Mysql-CDC -> Jdbc-Dameng
+```hocon
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:dm://e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    generate_sink_sql = true
+    database = "DAMENG"
+    table = "SYSDBA.sink_table_with_schema_change"
+    primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
+  }
+}
 ```
\ No newline at end of file
diff --git a/docs/zh/concept/schema-evolution.md 
b/docs/zh/concept/schema-evolution.md
index 1342759eaa..0c793bd6bf 100644
--- a/docs/zh/concept/schema-evolution.md
+++ b/docs/zh/concept/schema-evolution.md
@@ -22,16 +22,20 @@
 
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
 
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
 
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
+[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
 
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/StarRocks.md)
 
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md)
 
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Paimon.md#模式演变)
 
[Elasticsearch](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Elasticsearch.md#模式演变)
 
-注意: 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。
+注意: 
+* 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。
 
-当你使用Oracle-CDC时,你不能使用用户名`SYS`或`SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
+* 当你使用Oracle-CDC时,你不能使用用户名`SYS`或`SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
 另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。
 
+* 早期版本的`达梦`数据库不支持将`Varchar`类型字段更改为`Text`类型字段。
+
 ## 启用Schema evolution功能
 在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。
 
@@ -288,4 +292,45 @@ sink {
     multi_table_sink_replica = 2
   }
 }
+```
+
+### Mysql-CDC -> Jdbc-Dameng
+```hocon
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:dm://e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    generate_sink_sql = true
+    database = "DAMENG"
+    table = "SYSDBA.sink_table_with_schema_change"
+    primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
+  }
+}
 ```
\ No newline at end of file
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
index 0cc93804da..bddd2c33f0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
@@ -29,6 +29,7 @@ import lombok.ToString;
 public class AlterTableModifyColumnEvent extends AlterTableColumnEvent {
     private final Column column;
     private final boolean first;
+    private Boolean typeChanged;
     private final String afterColumn;
 
     public AlterTableModifyColumnEvent(
@@ -39,6 +40,10 @@ public class AlterTableModifyColumnEvent extends 
AlterTableColumnEvent {
         this.afterColumn = afterColumn;
     }
 
+    public void setTypeChanged(boolean typeChanged) {
+        this.typeChanged = typeChanged;
+    }
+
     public static AlterTableModifyColumnEvent modifyFirst(
             TableIdentifier tableIdentifier, Column column) {
         return new AlterTableModifyColumnEvent(tableIdentifier, column, true, 
null);
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
index 43f92a0a3e..a2a2cdf881 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
@@ -30,6 +30,8 @@ import 
org.apache.seatunnel.api.table.schema.event.AlterTableNameEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
@@ -132,16 +134,25 @@ public class AlterTableSchemaEventHandler implements 
TableSchemaChangeEventHandl
     private TableSchema applyModifyColumn(
             TableSchema schema, AlterTableModifyColumnEvent modifyColumnEvent) 
{
         List<String> fieldNames = Arrays.asList(schema.getFieldNames());
-        if (!fieldNames.contains(modifyColumnEvent.getColumn().getName())) {
+        Column modifyColumn = modifyColumnEvent.getColumn();
+        if (!fieldNames.contains(modifyColumn.getName())) {
             return schema;
         }
-
-        String modifyColumnName = modifyColumnEvent.getColumn().getName();
+        String modifyColumnName = modifyColumn.getName();
         int modifyColumnIndex = fieldNames.indexOf(modifyColumnName);
+        Column oldColumn = schema.getColumns().get(modifyColumnIndex);
+        String oldColumnSourceType = oldColumn.getSourceType();
+        String modifyColumnSourceType = modifyColumn.getSourceType();
+        if (StringUtils.isNoneEmpty(oldColumnSourceType)
+                && StringUtils.isNoneEmpty(modifyColumnSourceType)
+                && !oldColumnSourceType.split("\\(")[0].equals(
+                        modifyColumnSourceType.split("\\(")[0])) {
+            modifyColumnEvent.setTypeChanged(true);
+        }
         return applyModifyColumn(
                 schema,
                 modifyColumnIndex,
-                modifyColumnEvent.getColumn(),
+                modifyColumn,
                 modifyColumnEvent.isFirst(),
                 modifyColumnEvent.getAfterColumn());
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index dd3965e843..ba701ad19e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -17,17 +17,43 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_CHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_CHARACTER;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_CLOB;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_LONG;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_LONGVARCHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_NVARCHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_TEXT;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_VARCHAR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_VARCHAR2;
+
+@Slf4j
 public class DmdbDialect implements JdbcDialect {
 
     public String fieldIde;
@@ -157,4 +183,207 @@ public class DmdbDialect implements JdbcDialect {
 
         return "\"" + getFieldIde(identifier, fieldIde) + "\"";
     }
+
+    @Override
+    public TypeConverter<BasicTypeDefine> getTypeConverter() {
+        return DmdbTypeConverter.INSTANCE;
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableAddColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        Column column = event.getColumn();
+        String sourceDialectName = event.getSourceDialectName();
+        boolean sameCatalog = StringUtils.equals(dialectName(), 
sourceDialectName);
+        BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+        String columnType = sameCatalog ? column.getSourceType() : 
typeDefine.getColumnType();
+
+        // Build the SQL statement that add the column
+        StringBuilder sqlBuilder =
+                new StringBuilder()
+                        .append("ALTER TABLE ")
+                        .append(tableIdentifier(tablePath))
+                        .append(" ADD ")
+                        .append(quoteIdentifier(column.getName()))
+                        .append(" ")
+                        .append(columnType);
+
+        if (column.getDefaultValue() != null
+                && !column.isNullable()
+                && (sameCatalog
+                        || !isSpecialDefaultValue(
+                                typeDefine.getDefaultValue(), 
sourceDialectName))) {
+            // Handle default values and null constraints
+            String defaultValueClause = sqlClauseWithDefaultValue(typeDefine, 
sourceDialectName);
+            sqlBuilder.append(" NOT NULL ").append(defaultValueClause);
+        } else {
+            // If the column is nullable or the default value is not supported,
+            // the NULL constraint is added.
+            if (column.getDefaultValue() != null
+                    && isSpecialDefaultValue(typeDefine.getDefaultValue(), 
sourceDialectName)) {
+                log.warn(
+                        "Skipping unsupported default value for column {} in 
table {}. Using NULL constraint instead.",
+                        column.getName(),
+                        tablePath.getFullName());
+            }
+            sqlBuilder.append(" NULL");
+        }
+        ddlSQL.add(sqlBuilder.toString());
+
+        // Process column comment
+        if (column.getComment() != null) {
+            ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+        }
+
+        // Execute the DDL statement
+        executeDDL(connection, ddlSQL);
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableChangeColumnEvent event)
+            throws SQLException {
+        List<String> ddlSQL = new ArrayList<>();
+        if (event.getOldColumn() != null
+                && 
!(event.getColumn().getName().equals(event.getOldColumn()))) {
+            StringBuilder sqlBuilder =
+                    new StringBuilder()
+                            .append("ALTER TABLE ")
+                            .append(tableIdentifier(tablePath))
+                            .append(" RENAME COLUMN ")
+                            .append(quoteIdentifier(event.getOldColumn()))
+                            .append(" TO ")
+                            
.append(quoteIdentifier(event.getColumn().getName()));
+            ddlSQL.add(sqlBuilder.toString());
+        }
+
+        executeDDL(connection, ddlSQL);
+
+        if (event.getColumn().getDataType() != null) {
+            applySchemaChange(
+                    connection,
+                    tablePath,
+                    
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
+        }
+    }
+
+    @Override
+    public void applySchemaChange(
+            Connection connection, TablePath tablePath, 
AlterTableModifyColumnEvent event)
+            throws SQLException {
+        Column column = event.getColumn();
+        String sourceDialectName = event.getSourceDialectName();
+        boolean sameCatalog = StringUtils.equals(dialectName(), 
sourceDialectName);
+        // string conversion length will be extended by 4 in cross-database.
+        // eg: mysql varchar(10) -> Dameng varchar(40)
+        BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+        String columnType = sameCatalog ? column.getSourceType() : 
typeDefine.getColumnType();
+        if (event.getTypeChanged() != null
+                && event.getTypeChanged()
+                && DM_TEXT.equals(typeDefine.getColumnType())) {
+            log.warn(
+                    "DamengDB does not support modifying the TEXT type 
directly. "
+                            + "Please use ALTER TABLE MODIFY COLUMN to change 
the column type.");
+        }
+        // Build the SQL statement that modifies the column
+        StringBuilder sqlBuilder =
+                new StringBuilder("ALTER TABLE ")
+                        .append(tableIdentifier(tablePath))
+                        .append(" MODIFY ")
+                        .append(quoteIdentifier(column.getName()))
+                        .append(" ")
+                        .append(columnType);
+
+        // Handle null constraints
+        // DamengDB does not direct support modifying the NULL to NOT-NUll 
constraint directly.
+        // if supported, need update null value to defaultvalue, then modify 
the column to NOT NULL.
+        // this is a high-risk operation, so we do not support it.
+        boolean targetColumnNullable = columnIsNullable(connection, tablePath, 
column.getName());
+        if (column.isNullable() != targetColumnNullable && 
!targetColumnNullable) {
+            sqlBuilder.append(" NULL ");
+        }
+
+        // Handle default value
+        if (column.getDefaultValue() != null) {
+            if (sameCatalog
+                    || !isSpecialDefaultValue(typeDefine.getDefaultValue(), 
sourceDialectName)) {
+                String defaultValueClause =
+                        sqlClauseWithDefaultValue(typeDefine, 
sourceDialectName);
+                sqlBuilder.append(" ").append(defaultValueClause);
+            } else {
+                log.warn(
+                        "Skipping unsupported default value for column {} in 
table {}.",
+                        column.getName(),
+                        tablePath.getFullName());
+            }
+        }
+        List<String> ddlSQL = new ArrayList<>();
+        ddlSQL.add(sqlBuilder.toString());
+        // Process column comment
+        if (column.getComment() != null) {
+            ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+        }
+        // Execute the DDL statement
+        executeDDL(connection, ddlSQL);
+    }
+
+    @Override
+    public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
+        String dmDataType = columnDefine.getDataType();
+        switch (dmDataType) {
+            case DM_CHAR:
+            case DM_CHARACTER:
+            case DM_VARCHAR:
+            case DM_VARCHAR2:
+            case DM_NVARCHAR:
+            case DM_LONGVARCHAR:
+            case DM_CLOB:
+            case DM_TEXT:
+            case DM_LONG:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    private void executeDDL(Connection connection, List<String> ddlSQL) throws 
SQLException {
+        try (Statement statement = connection.createStatement()) {
+            for (String sql : ddlSQL) {
+                log.info("Executing DDL SQL: {}", sql);
+                statement.execute(sql);
+            }
+        } catch (SQLException e) {
+            throw new SQLException("Error executing DDL SQL: " + ddlSQL, 
e.getSQLState(), e);
+        }
+    }
+
+    private String buildColumnCommentSQL(TablePath tablePath, Column column) {
+        return String.format(
+                "COMMENT ON COLUMN %s.%s IS '%s'",
+                tableIdentifier(tablePath), quoteIdentifier(column.getName()), 
column.getComment());
+    }
+
+    private boolean columnIsNullable(Connection connection, TablePath 
tablePath, String column)
+            throws SQLException {
+        String selectColumnSQL =
+                "SELECT"
+                        + "        NULLABLE FROM"
+                        + "        ALL_TAB_COLUMNS c"
+                        + "        WHERE c.owner = '"
+                        + tablePath.getSchemaName()
+                        + "'"
+                        + "        AND c.table_name = '"
+                        + tablePath.getTableName()
+                        + "'"
+                        + "        AND c.column_name = '"
+                        + column
+                        + "'";
+        try (Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery(selectColumnSQL);
+            rs.next();
+            return rs.getString("NULLABLE").equals("Y");
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
index e78a1e5fa1..711953ece0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
@@ -23,7 +23,7 @@
     </parent>
 
     <artifactId>connector-jdbc-e2e-ddl</artifactId>
-    <name>SeaTunnel : Connectors V2 : JDBC Sink : Schema Evolution</name>
+    <name>SeaTunnel : E2E : Connector V2 : JDBC : Schema Evolution</name>
 
     <dependencyManagement>
         <dependencies>
@@ -48,23 +48,16 @@
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-cdc-mysql</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-transforms-v2</artifactId>
+            <artifactId>connector-jdbc</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-assert</artifactId>
+            <artifactId>connector-cdc-mysql</artifactId>
             <version>${project.version}</version>
+            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
 
@@ -80,6 +73,7 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- jdbc sink dirver -->
         <dependency>
             <!-- fix CVE-2022-26520 
https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520  -->
             <groupId>org.postgresql</groupId>
@@ -87,6 +81,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.dameng</groupId>
+            <artifactId>DmJdbcDriver18</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- jdbc sink container image-->
         <dependency>
             <groupId>org.testcontainers</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
index ce2fd9a373..06c13b4153 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
@@ -47,9 +47,12 @@ import org.testcontainers.utility.DockerLoggerFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.Reader;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.NClob;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Duration;
@@ -101,6 +104,10 @@ public abstract class AbstractSchemaChangeBaseIT extends 
TestSuiteBase implement
 
     protected abstract GenericContainer initSinkContainer();
 
+    protected abstract String sinkDatabaseType();
+
+    protected void intializeSinkDatabase() {}
+
     @BeforeAll
     @Override
     public void startUp() {
@@ -110,11 +117,12 @@ public abstract class AbstractSchemaChangeBaseIT extends 
TestSuiteBase implement
         log.info("Mysql Containers are started");
         sourceDatabase.createAndInitialize();
         log.info("Mysql ddl execution is complete");
-
-        log.info("The third stage: Starting {} containers...", 
schemaChangeCase.getDbType());
+        // sink database initialization
+        log.info("The third stage: Starting {} containers...", 
sinkDatabaseType());
         sinkDbServer = 
initSinkContainer().withImagePullPolicy(PullPolicy.defaultPolicy());
         Startables.deepStart(Stream.of(sinkDbServer)).join();
-        log.info("{} Containers are started", schemaChangeCase.getDbType());
+        log.info("{} Containers are started", sinkDatabaseType());
+        intializeSinkDatabase();
     }
 
     @AfterAll
@@ -255,7 +263,7 @@ public abstract class AbstractSchemaChangeBaseIT extends 
TestSuiteBase implement
         if (!schemaChangeCase.isOpenExactlyOnce()) {
             log.info(
                     "{} not support Xa transactions, Skip 
testMysqlCdcWithSchemaEvolutionCaseExactlyOnce",
-                    schemaChangeCase.getDbType());
+                    sinkDatabaseType());
             return;
         }
         String jobConfigFile = 
schemaChangeCase.getSchemaEvolutionCaseExactlyOnce();
@@ -492,12 +500,17 @@ public abstract class AbstractSchemaChangeBaseIT extends 
TestSuiteBase implement
             while (resultSet.next()) {
                 ArrayList<Object> objects = new ArrayList<>();
                 for (int i = 1; i <= columnCount; i++) {
-                    objects.add(resultSet.getObject(i));
+                    Object object = resultSet.getObject(i);
+                    if (object instanceof NClob) {
+                        objects.add(readNClobAsString((NClob) object));
+                    } else {
+                        objects.add(object);
+                    }
                 }
                 log.debug(
                         String.format(
                                 "Print %s query, sql: %s, data: %s",
-                                schemaChangeCase.getDbType(), sql, objects));
+                                sinkDatabaseType(), sql, objects));
                 result.add(objects);
             }
             return result;
@@ -505,4 +518,18 @@ public abstract class AbstractSchemaChangeBaseIT extends 
TestSuiteBase implement
             throw new RuntimeException(e);
         }
     }
+
+    private Object readNClobAsString(NClob nclob) {
+        try (Reader reader = nclob.getCharacterStream();
+                BufferedReader bufferedReader = new BufferedReader(reader)) {
+            StringBuilder stringBuilder = new StringBuilder();
+            String line;
+            while ((line = bufferedReader.readLine()) != null) {
+                stringBuilder.append(line);
+            }
+            return stringBuilder.toString();
+        } catch (SQLException | IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/DmSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/DmSchemaChangeIT.java
new file mode 100644
index 0000000000..82d8a95fbf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/DmSchemaChangeIT.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+public class DmSchemaChangeIT extends AbstractSchemaChangeBaseIT {
+
+    private static final String DATABASE_TYPE = "Dameng";
+    private static final String DM_IMAGE = "laglangyue/dmdb8";
+    private static final String DM_CONTAINER_HOST = "e2e_dmdb";
+    private static final String DM_DATABASE = "SYSDBA";
+    private static final String DM_USERNAME = "SYSDBA";
+    private static final String DM_PASSWORD = "SYSDBA";
+    private static final int DM_PORT = 5236;
+    private static final String DM_URL = "jdbc:dm://%s:%s/%s";
+
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+
+    private static final String DM_DRIVER_JAR =
+            
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";;
+    private final String schemaEvolutionCase_config = 
"/mysqlcdc_to_dm_with_schema_change.conf";
+    private final String schemaEvolutionCaseExactlyOnce_config =
+            "/mysqlcdc_to_dm_with_schema_change_exactly_once.conf";
+    private final String QUERRY_COLUMNS =
+            "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = '%s' AND 
TABLE_NAME = '%s' ORDER by COLUMN_NAME";
+
+    @Override
+    protected SchemaChangeCase getSchemaChangeCase() {
+        return SchemaChangeCase.builder()
+                .jdbcUrl(DM_URL)
+                .username(DM_USERNAME)
+                .password(DM_PASSWORD)
+                .driverUrl(DM_DRIVER_JAR)
+                .port(DM_PORT)
+                .driverClassName(DRIVER_CLASS)
+                .databaseName(DM_DATABASE)
+                .schemaName(DM_USERNAME)
+                .schemaEvolutionCase(schemaEvolutionCase_config)
+                .sinkTable1(SINK_TABLE1)
+                .openExactlyOnce(true)
+                
.schemaEvolutionCaseExactlyOnce(schemaEvolutionCaseExactlyOnce_config)
+                .sinkTable2(SINK_TABLE2)
+                .sinkQueryColumns(QUERRY_COLUMNS)
+                .build();
+    }
+
+    @Override
+    protected GenericContainer initSinkContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(DM_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(DM_CONTAINER_HOST)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DM_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
DM_PORT, DM_PORT)));
+        container.setPrivilegedMode(true);
+        return container;
+    }
+
+    @Override
+    protected String sinkDatabaseType() {
+        return DATABASE_TYPE;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
index bce7562c96..623a08c17a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
@@ -49,7 +49,6 @@ public class PostgresSchemaChangeIT extends 
AbstractSchemaChangeBaseIT {
     @Override
     protected SchemaChangeCase getSchemaChangeCase() {
         return SchemaChangeCase.builder()
-                .dbType(DATABASE_TYPE)
                 .jdbcUrl(PG_JDBC_URL)
                 .username(PG_USER)
                 .password(PG_PASSWORD)
@@ -84,4 +83,9 @@ public class PostgresSchemaChangeIT extends 
AbstractSchemaChangeBaseIT {
         container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
PG_PORT, PG_PORT)));
         return container;
     }
+
+    @Override
+    protected String sinkDatabaseType() {
+        return DATABASE_TYPE;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
index f138ecc35b..71384c3d53 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
@@ -23,8 +23,6 @@ import lombok.Data;
 @Data
 @Builder
 public class SchemaChangeCase {
-
-    private String dbType;
     private String driverUrl;
     private String jdbcUrl;
     private String driverClassName;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
index 1abcc9bbad..2f7dcb47cb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
@@ -21,7 +21,7 @@
 CREATE DATABASE IF NOT EXISTS `shop`;
 use shop;
 
-alter table products modify name longtext null;
+alter table products modify name VARCHAR(400) null;
 delete from products where id < 155;
 insert into products
 values (164,"scooter","Small 2-wheel scooter",3.14,1),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
index eb258d1840..0eb9d6d787 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
@@ -25,7 +25,7 @@ drop table if exists products;
 -- Create and populate our products using a single insert with many rows
 CREATE TABLE products (
   id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
-  name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+  name VARCHAR(150) NOT NULL DEFAULT 'SeaTunnel',
   description VARCHAR(512),
   weight DECIMAL(8,2)
 );
@@ -33,7 +33,7 @@ CREATE TABLE products (
 drop table if exists mysql_cdc_e2e_sink_table_with_schema_change;
 CREATE TABLE if not exists mysql_cdc_e2e_sink_table_with_schema_change (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ name VARCHAR(150) NOT NULL DEFAULT 'SeaTunnel',
  description VARCHAR(512),
  weight DECIMAL(8,2)
 );
@@ -41,7 +41,7 @@ CREATE TABLE if not exists 
mysql_cdc_e2e_sink_table_with_schema_change (
 drop table if exists mysql_cdc_e2e_sink_table_with_schema_change_exactly_once;
 CREATE TABLE if not exists 
mysql_cdc_e2e_sink_table_with_schema_change_exactly_once (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ name VARCHAR(150) NOT NULL DEFAULT 'SeaTunnel',
  description VARCHAR(512),
  weight DECIMAL(8,2)
 );
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change.conf
new file mode 100644
index 0000000000..a0412fa9d0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:dm://e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    generate_sink_sql = true
+    database = "DAMENG"
+    table = "SYSDBA.sink_table_with_schema_change"
+    primary_keys = ["id"]
+
+    # Validate ddl update for sink writer multi replica
+    multi_table_sink_replica = 2
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change_exactly_once.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change_exactly_once.conf
new file mode 100644
index 0000000000..7065389f05
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change_exactly_once.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 5
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  MySQL-CDC {
+    server-id = 5652-5657
+    username = "st_user_source"
+    password = "mysqlpw"
+    table-names = ["shop.products"]
+    base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+    schema-changes.enabled = true
+  }
+}
+
+sink {
+  jdbc {
+    url = "jdbc:dm://e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    database = "DAMENG"
+    generate_sink_sql = true
+    table = "SYSDBA.sink_table_with_schema_change_exactly_once"
+    primary_keys = ["id"]
+    xa_data_source_class_name = "dm.jdbc.driver.DmdbXADataSource"
+  }
+}


Reply via email to