This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5ff3427428 [Feature][Jdbc] Support sink ddl for dameng (#8380)
5ff3427428 is described below
commit 5ff3427428bf40d041c420dc693d9734e2ed56c0
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Feb 13 14:08:59 2025 +0800
[Feature][Jdbc] Support sink ddl for dameng (#8380)
---
.github/workflows/backend.yml | 28 +++
docs/en/concept/schema-evolution.md | 49 ++++-
docs/zh/concept/schema-evolution.md | 49 ++++-
.../schema/event/AlterTableModifyColumnEvent.java | 5 +
.../handler/AlterTableSchemaEventHandler.java | 19 +-
.../jdbc/internal/dialect/dm/DmdbDialect.java | 229 +++++++++++++++++++++
.../connector-jdbc-e2e-ddl/pom.xml | 22 +-
.../jdbc/AbstractSchemaChangeBaseIT.java | 39 +++-
.../connectors/jdbc/DmSchemaChangeIT.java | 84 ++++++++
.../connectors/jdbc/PostgresSchemaChangeIT.java | 6 +-
.../connectors/jdbc/SchemaChangeCase.java | 2 -
.../src/test/resources/ddl/modify_columns.sql | 2 +-
.../src/test/resources/ddl/shop.sql | 6 +-
.../mysqlcdc_to_dm_with_schema_change.conf | 57 +++++
...lcdc_to_dm_with_schema_change_exactly_once.conf | 55 +++++
15 files changed, 620 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 9734e7000d..4a25bf2caf 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -1139,6 +1139,34 @@ jobs:
env:
MAVEN_OPTS: -Xmx4096m
+ jdbc-connectors-it-ddl:
+ needs: [ changes, sanity-check ]
+ if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine ==
'true'
+ runs-on: ${{ matrix.os }}
+ env:
+ RUN_ALL_CONTAINER: ${{ needs.changes.outputs.api }}
+ RUN_ZETA_CONTAINER: ${{ needs.changes.outputs.engine }}
+ strategy:
+ matrix:
+ java: [ '8', '11' ]
+ os: [ 'ubuntu-latest' ]
+ timeout-minutes: 120
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v3
+ with:
+ java-version: ${{ matrix.java }}
+ distribution: 'temurin'
+ cache: 'maven'
+ - name: free disk space
+ run: tools/github/free_disk_space.sh
+ - name: run jdbc connectors integration test (sink ddl)
+ run: |
+ ./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false
-D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates -pl
:connector-jdbc-e2e-ddl -am -Pci
+ env:
+ MAVEN_OPTS: -Xmx4096m
+
kudu-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true' ||
contains(needs.changes.outputs.it-modules, 'connector-kudu-e2e')
diff --git a/docs/en/concept/schema-evolution.md
b/docs/en/concept/schema-evolution.md
index 88970a0b42..4e29c50d67 100644
--- a/docs/en/concept/schema-evolution.md
+++ b/docs/en/concept/schema-evolution.md
@@ -22,16 +22,20 @@ Schema Evolution means that the schema of a data table can
be changed and the da
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
+[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Doris.md)
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Paimon.md#Schema-Evolution)
[Elasticsearch](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Elasticsearch.md#Schema-Evolution)
-Note: The schema evolution is not support the transform at now. The schema
evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently
not supported the default value of the column in ddl.
+Note:
+* The schema evolution is not support the transform at now. The schema
evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently
not supported the default value of the column in ddl.
-When you use the Oracle-CDC,you can not use the username named `SYS` or
`SYSTEM` to modify the table schema, otherwise the ddl event will be filtered
out which can lead to the schema evolution not working.
+* When you use the Oracle-CDC,you can not use the username named `SYS` or
`SYSTEM` to modify the table schema, otherwise the ddl event will be filtered
out which can lead to the schema evolution not working.
Otherwise, If your table name start with `ORA_TEMP_` will also has the same
problem.
+* Earlier versions of `Dameng` databases do not support the change of
`Varchar` type fields to `Text` type fields.
+
## Enable schema evolution
Schema evolution is disabled by default in CDC source. You need configure
`schema-changes.enabled = true` which is only supported in CDC to enable it.
@@ -287,4 +291,45 @@ sink {
multi_table_sink_replica = 2
}
}
+```
+
+### Mysql-CDC -> Jdbc-Dameng
+```hocon
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:dm://e2e_dmdb:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ generate_sink_sql = true
+ database = "DAMENG"
+ table = "SYSDBA.sink_table_with_schema_change"
+ primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
+ }
+}
```
\ No newline at end of file
diff --git a/docs/zh/concept/schema-evolution.md
b/docs/zh/concept/schema-evolution.md
index 1342759eaa..0c793bd6bf 100644
--- a/docs/zh/concept/schema-evolution.md
+++ b/docs/zh/concept/schema-evolution.md
@@ -22,16 +22,20 @@
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[Jdbc-Postgres](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
+[Jdbc-Dameng](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/StarRocks.md)
[Doris](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Doris.md)
[Paimon](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Paimon.md#模式演变)
[Elasticsearch](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Elasticsearch.md#模式演变)
-注意: 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。
+注意:
+* 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。
-当你使用Oracle-CDC时,你不能使用用户名`SYS`或`SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
+* 当你使用Oracle-CDC时,你不能使用用户名`SYS`或`SYSTEM`来修改表结构,否则ddl事件将被过滤,这可能导致模式演进不起作用;
另外,如果你的表名以`ORA_TEMP_`开头,也会有相同的问题。
+* 早期版本的`达梦`数据库不支持将`Varchar`类型字段更改为`Text`类型字段。
+
## 启用Schema evolution功能
在CDC源连接器中模式演进默认是关闭的。你需要在CDC连接器中配置`schema-changes.enabled = true`来启用它。
@@ -288,4 +292,45 @@ sink {
multi_table_sink_replica = 2
}
}
+```
+
+### Mysql-CDC -> Jdbc-Dameng
+```hocon
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:dm://e2e_dmdb:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ generate_sink_sql = true
+ database = "DAMENG"
+ table = "SYSDBA.sink_table_with_schema_change"
+ primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
+ }
+}
```
\ No newline at end of file
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
index 0cc93804da..bddd2c33f0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/event/AlterTableModifyColumnEvent.java
@@ -29,6 +29,7 @@ import lombok.ToString;
public class AlterTableModifyColumnEvent extends AlterTableColumnEvent {
private final Column column;
private final boolean first;
+ private Boolean typeChanged;
private final String afterColumn;
public AlterTableModifyColumnEvent(
@@ -39,6 +40,10 @@ public class AlterTableModifyColumnEvent extends
AlterTableColumnEvent {
this.afterColumn = afterColumn;
}
+ public void setTypeChanged(boolean typeChanged) {
+ this.typeChanged = typeChanged;
+ }
+
public static AlterTableModifyColumnEvent modifyFirst(
TableIdentifier tableIdentifier, Column column) {
return new AlterTableModifyColumnEvent(tableIdentifier, column, true,
null);
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
index 43f92a0a3e..a2a2cdf881 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/schema/handler/AlterTableSchemaEventHandler.java
@@ -30,6 +30,8 @@ import
org.apache.seatunnel.api.table.schema.event.AlterTableNameEvent;
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -132,16 +134,25 @@ public class AlterTableSchemaEventHandler implements
TableSchemaChangeEventHandl
private TableSchema applyModifyColumn(
TableSchema schema, AlterTableModifyColumnEvent modifyColumnEvent)
{
List<String> fieldNames = Arrays.asList(schema.getFieldNames());
- if (!fieldNames.contains(modifyColumnEvent.getColumn().getName())) {
+ Column modifyColumn = modifyColumnEvent.getColumn();
+ if (!fieldNames.contains(modifyColumn.getName())) {
return schema;
}
-
- String modifyColumnName = modifyColumnEvent.getColumn().getName();
+ String modifyColumnName = modifyColumn.getName();
int modifyColumnIndex = fieldNames.indexOf(modifyColumnName);
+ Column oldColumn = schema.getColumns().get(modifyColumnIndex);
+ String oldColumnSourceType = oldColumn.getSourceType();
+ String modifyColumnSourceType = modifyColumn.getSourceType();
+ if (StringUtils.isNoneEmpty(oldColumnSourceType)
+ && StringUtils.isNoneEmpty(modifyColumnSourceType)
+ && !oldColumnSourceType.split("\\(")[0].equals(
+ modifyColumnSourceType.split("\\(")[0])) {
+ modifyColumnEvent.setTypeChanged(true);
+ }
return applyModifyColumn(
schema,
modifyColumnIndex,
- modifyColumnEvent.getColumn(),
+ modifyColumn,
modifyColumnEvent.isFirst(),
modifyColumnEvent.getAfterColumn());
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index dd3965e843..ba701ad19e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -17,17 +17,43 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
+import org.apache.seatunnel.api.table.converter.TypeConverter;
+import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableChangeColumnEvent;
+import org.apache.seatunnel.api.table.schema.event.AlterTableModifyColumnEvent;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+import org.apache.commons.lang3.StringUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_CHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_CHARACTER;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_CLOB;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_LONG;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_LONGVARCHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_NVARCHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_TEXT;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_VARCHAR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter.DM_VARCHAR2;
+
+@Slf4j
public class DmdbDialect implements JdbcDialect {
public String fieldIde;
@@ -157,4 +183,207 @@ public class DmdbDialect implements JdbcDialect {
return "\"" + getFieldIde(identifier, fieldIde) + "\"";
}
+
+ @Override
+ public TypeConverter<BasicTypeDefine> getTypeConverter() {
+ return DmdbTypeConverter.INSTANCE;
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableAddColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ Column column = event.getColumn();
+ String sourceDialectName = event.getSourceDialectName();
+ boolean sameCatalog = StringUtils.equals(dialectName(),
sourceDialectName);
+ BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+ String columnType = sameCatalog ? column.getSourceType() :
typeDefine.getColumnType();
+
+ // Build the SQL statement that add the column
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE ")
+ .append(tableIdentifier(tablePath))
+ .append(" ADD ")
+ .append(quoteIdentifier(column.getName()))
+ .append(" ")
+ .append(columnType);
+
+ if (column.getDefaultValue() != null
+ && !column.isNullable()
+ && (sameCatalog
+ || !isSpecialDefaultValue(
+ typeDefine.getDefaultValue(),
sourceDialectName))) {
+ // Handle default values and null constraints
+ String defaultValueClause = sqlClauseWithDefaultValue(typeDefine,
sourceDialectName);
+ sqlBuilder.append(" NOT NULL ").append(defaultValueClause);
+ } else {
+ // If the column is nullable or the default value is not supported,
+ // the NULL constraint is added.
+ if (column.getDefaultValue() != null
+ && isSpecialDefaultValue(typeDefine.getDefaultValue(),
sourceDialectName)) {
+ log.warn(
+ "Skipping unsupported default value for column {} in
table {}. Using NULL constraint instead.",
+ column.getName(),
+ tablePath.getFullName());
+ }
+ sqlBuilder.append(" NULL");
+ }
+ ddlSQL.add(sqlBuilder.toString());
+
+ // Process column comment
+ if (column.getComment() != null) {
+ ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+ }
+
+ // Execute the DDL statement
+ executeDDL(connection, ddlSQL);
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableChangeColumnEvent event)
+ throws SQLException {
+ List<String> ddlSQL = new ArrayList<>();
+ if (event.getOldColumn() != null
+ &&
!(event.getColumn().getName().equals(event.getOldColumn()))) {
+ StringBuilder sqlBuilder =
+ new StringBuilder()
+ .append("ALTER TABLE ")
+ .append(tableIdentifier(tablePath))
+ .append(" RENAME COLUMN ")
+ .append(quoteIdentifier(event.getOldColumn()))
+ .append(" TO ")
+
.append(quoteIdentifier(event.getColumn().getName()));
+ ddlSQL.add(sqlBuilder.toString());
+ }
+
+ executeDDL(connection, ddlSQL);
+
+ if (event.getColumn().getDataType() != null) {
+ applySchemaChange(
+ connection,
+ tablePath,
+
AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn()));
+ }
+ }
+
+ @Override
+ public void applySchemaChange(
+ Connection connection, TablePath tablePath,
AlterTableModifyColumnEvent event)
+ throws SQLException {
+ Column column = event.getColumn();
+ String sourceDialectName = event.getSourceDialectName();
+ boolean sameCatalog = StringUtils.equals(dialectName(),
sourceDialectName);
+ // string conversion length will be extended by 4 in cross-database.
+ // eg: mysql varchar(10) -> Dameng varchar(40)
+ BasicTypeDefine typeDefine = getTypeConverter().reconvert(column);
+ String columnType = sameCatalog ? column.getSourceType() :
typeDefine.getColumnType();
+ if (event.getTypeChanged() != null
+ && event.getTypeChanged()
+ && DM_TEXT.equals(typeDefine.getColumnType())) {
+ log.warn(
+ "DamengDB does not support modifying the TEXT type
directly. "
+ + "Please use ALTER TABLE MODIFY COLUMN to change
the column type.");
+ }
+ // Build the SQL statement that modifies the column
+ StringBuilder sqlBuilder =
+ new StringBuilder("ALTER TABLE ")
+ .append(tableIdentifier(tablePath))
+ .append(" MODIFY ")
+ .append(quoteIdentifier(column.getName()))
+ .append(" ")
+ .append(columnType);
+
+ // Handle null constraints
+ // DamengDB does not direct support modifying the NULL to NOT-NUll
constraint directly.
+ // if supported, need update null value to defaultvalue, then modify
the column to NOT NULL.
+ // this is a high-risk operation, so we do not support it.
+ boolean targetColumnNullable = columnIsNullable(connection, tablePath,
column.getName());
+ if (column.isNullable() != targetColumnNullable &&
!targetColumnNullable) {
+ sqlBuilder.append(" NULL ");
+ }
+
+ // Handle default value
+ if (column.getDefaultValue() != null) {
+ if (sameCatalog
+ || !isSpecialDefaultValue(typeDefine.getDefaultValue(),
sourceDialectName)) {
+ String defaultValueClause =
+ sqlClauseWithDefaultValue(typeDefine,
sourceDialectName);
+ sqlBuilder.append(" ").append(defaultValueClause);
+ } else {
+ log.warn(
+ "Skipping unsupported default value for column {} in
table {}.",
+ column.getName(),
+ tablePath.getFullName());
+ }
+ }
+ List<String> ddlSQL = new ArrayList<>();
+ ddlSQL.add(sqlBuilder.toString());
+ // Process column comment
+ if (column.getComment() != null) {
+ ddlSQL.add(buildColumnCommentSQL(tablePath, column));
+ }
+ // Execute the DDL statement
+ executeDDL(connection, ddlSQL);
+ }
+
+ @Override
+ public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) {
+ String dmDataType = columnDefine.getDataType();
+ switch (dmDataType) {
+ case DM_CHAR:
+ case DM_CHARACTER:
+ case DM_VARCHAR:
+ case DM_VARCHAR2:
+ case DM_NVARCHAR:
+ case DM_LONGVARCHAR:
+ case DM_CLOB:
+ case DM_TEXT:
+ case DM_LONG:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private void executeDDL(Connection connection, List<String> ddlSQL) throws
SQLException {
+ try (Statement statement = connection.createStatement()) {
+ for (String sql : ddlSQL) {
+ log.info("Executing DDL SQL: {}", sql);
+ statement.execute(sql);
+ }
+ } catch (SQLException e) {
+ throw new SQLException("Error executing DDL SQL: " + ddlSQL,
e.getSQLState(), e);
+ }
+ }
+
+ private String buildColumnCommentSQL(TablePath tablePath, Column column) {
+ return String.format(
+ "COMMENT ON COLUMN %s.%s IS '%s'",
+ tableIdentifier(tablePath), quoteIdentifier(column.getName()),
column.getComment());
+ }
+
+ private boolean columnIsNullable(Connection connection, TablePath
tablePath, String column)
+ throws SQLException {
+ String selectColumnSQL =
+ "SELECT"
+ + " NULLABLE FROM"
+ + " ALL_TAB_COLUMNS c"
+ + " WHERE c.owner = '"
+ + tablePath.getSchemaName()
+ + "'"
+ + " AND c.table_name = '"
+ + tablePath.getTableName()
+ + "'"
+ + " AND c.column_name = '"
+ + column
+ + "'";
+ try (Statement statement = connection.createStatement()) {
+ ResultSet rs = statement.executeQuery(selectColumnSQL);
+ rs.next();
+ return rs.getString("NULLABLE").equals("Y");
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
index e78a1e5fa1..711953ece0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/pom.xml
@@ -23,7 +23,7 @@
</parent>
<artifactId>connector-jdbc-e2e-ddl</artifactId>
- <name>SeaTunnel : Connectors V2 : JDBC Sink : Schema Evolution</name>
+ <name>SeaTunnel : E2E : Connector V2 : JDBC : Schema Evolution</name>
<dependencyManagement>
<dependencies>
@@ -48,23 +48,16 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-cdc-mysql</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-transforms-v2</artifactId>
+ <artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-assert</artifactId>
+ <artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
+ <type>test-jar</type>
<scope>test</scope>
</dependency>
@@ -80,6 +73,7 @@
<scope>test</scope>
</dependency>
+ <!-- jdbc sink dirver -->
<dependency>
<!-- fix CVE-2022-26520
https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 -->
<groupId>org.postgresql</groupId>
@@ -87,6 +81,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.dameng</groupId>
+ <artifactId>DmJdbcDriver18</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- jdbc sink container image-->
<dependency>
<groupId>org.testcontainers</groupId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
index ce2fd9a373..06c13b4153 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/AbstractSchemaChangeBaseIT.java
@@ -47,9 +47,12 @@ import org.testcontainers.utility.DockerLoggerFactory;
import lombok.extern.slf4j.Slf4j;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.NClob;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
@@ -101,6 +104,10 @@ public abstract class AbstractSchemaChangeBaseIT extends
TestSuiteBase implement
protected abstract GenericContainer initSinkContainer();
+ protected abstract String sinkDatabaseType();
+
+ protected void intializeSinkDatabase() {}
+
@BeforeAll
@Override
public void startUp() {
@@ -110,11 +117,12 @@ public abstract class AbstractSchemaChangeBaseIT extends
TestSuiteBase implement
log.info("Mysql Containers are started");
sourceDatabase.createAndInitialize();
log.info("Mysql ddl execution is complete");
-
- log.info("The third stage: Starting {} containers...",
schemaChangeCase.getDbType());
+ // sink database initialization
+ log.info("The third stage: Starting {} containers...",
sinkDatabaseType());
sinkDbServer =
initSinkContainer().withImagePullPolicy(PullPolicy.defaultPolicy());
Startables.deepStart(Stream.of(sinkDbServer)).join();
- log.info("{} Containers are started", schemaChangeCase.getDbType());
+ log.info("{} Containers are started", sinkDatabaseType());
+ intializeSinkDatabase();
}
@AfterAll
@@ -255,7 +263,7 @@ public abstract class AbstractSchemaChangeBaseIT extends
TestSuiteBase implement
if (!schemaChangeCase.isOpenExactlyOnce()) {
log.info(
"{} not support Xa transactions, Skip
testMysqlCdcWithSchemaEvolutionCaseExactlyOnce",
- schemaChangeCase.getDbType());
+ sinkDatabaseType());
return;
}
String jobConfigFile =
schemaChangeCase.getSchemaEvolutionCaseExactlyOnce();
@@ -492,12 +500,17 @@ public abstract class AbstractSchemaChangeBaseIT extends
TestSuiteBase implement
while (resultSet.next()) {
ArrayList<Object> objects = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
- objects.add(resultSet.getObject(i));
+ Object object = resultSet.getObject(i);
+ if (object instanceof NClob) {
+ objects.add(readNClobAsString((NClob) object));
+ } else {
+ objects.add(object);
+ }
}
log.debug(
String.format(
"Print %s query, sql: %s, data: %s",
- schemaChangeCase.getDbType(), sql, objects));
+ sinkDatabaseType(), sql, objects));
result.add(objects);
}
return result;
@@ -505,4 +518,18 @@ public abstract class AbstractSchemaChangeBaseIT extends
TestSuiteBase implement
throw new RuntimeException(e);
}
}
+
+ private Object readNClobAsString(NClob nclob) {
+ try (Reader reader = nclob.getCharacterStream();
+ BufferedReader bufferedReader = new BufferedReader(reader)) {
+ StringBuilder stringBuilder = new StringBuilder();
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ stringBuilder.append(line);
+ }
+ return stringBuilder.toString();
+ } catch (SQLException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/DmSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/DmSchemaChangeIT.java
new file mode 100644
index 0000000000..82d8a95fbf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/DmSchemaChangeIT.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.jdbc;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+public class DmSchemaChangeIT extends AbstractSchemaChangeBaseIT {
+
+ private static final String DATABASE_TYPE = "Dameng";
+ private static final String DM_IMAGE = "laglangyue/dmdb8";
+ private static final String DM_CONTAINER_HOST = "e2e_dmdb";
+ private static final String DM_DATABASE = "SYSDBA";
+ private static final String DM_USERNAME = "SYSDBA";
+ private static final String DM_PASSWORD = "SYSDBA";
+ private static final int DM_PORT = 5236;
+ private static final String DM_URL = "jdbc:dm://%s:%s/%s";
+
+ private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+
+ private static final String DM_DRIVER_JAR =
+
"https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar";
+ private final String schemaEvolutionCase_config =
"/mysqlcdc_to_dm_with_schema_change.conf";
+ private final String schemaEvolutionCaseExactlyOnce_config =
+ "/mysqlcdc_to_dm_with_schema_change_exactly_once.conf";
+ private final String QUERRY_COLUMNS =
+ "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = '%s' AND
TABLE_NAME = '%s' ORDER by COLUMN_NAME";
+
+ @Override
+ protected SchemaChangeCase getSchemaChangeCase() {
+ return SchemaChangeCase.builder()
+ .jdbcUrl(DM_URL)
+ .username(DM_USERNAME)
+ .password(DM_PASSWORD)
+ .driverUrl(DM_DRIVER_JAR)
+ .port(DM_PORT)
+ .driverClassName(DRIVER_CLASS)
+ .databaseName(DM_DATABASE)
+ .schemaName(DM_USERNAME)
+ .schemaEvolutionCase(schemaEvolutionCase_config)
+ .sinkTable1(SINK_TABLE1)
+ .openExactlyOnce(true)
+
.schemaEvolutionCaseExactlyOnce(schemaEvolutionCaseExactlyOnce_config)
+ .sinkTable2(SINK_TABLE2)
+ .sinkQueryColumns(QUERRY_COLUMNS)
+ .build();
+ }
+
+ @Override
+ protected GenericContainer initSinkContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(DM_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(DM_CONTAINER_HOST)
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(DM_IMAGE)));
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
DM_PORT, DM_PORT)));
+ container.setPrivilegedMode(true);
+ return container;
+ }
+
+ @Override
+ protected String sinkDatabaseType() {
+ return DATABASE_TYPE;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
index bce7562c96..623a08c17a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/PostgresSchemaChangeIT.java
@@ -49,7 +49,6 @@ public class PostgresSchemaChangeIT extends
AbstractSchemaChangeBaseIT {
@Override
protected SchemaChangeCase getSchemaChangeCase() {
return SchemaChangeCase.builder()
- .dbType(DATABASE_TYPE)
.jdbcUrl(PG_JDBC_URL)
.username(PG_USER)
.password(PG_PASSWORD)
@@ -84,4 +83,9 @@ public class PostgresSchemaChangeIT extends
AbstractSchemaChangeBaseIT {
container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
PG_PORT, PG_PORT)));
return container;
}
+
+ @Override
+ protected String sinkDatabaseType() {
+ return DATABASE_TYPE;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
index f138ecc35b..71384c3d53 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/java/org/apache/seatunnel/connectors/jdbc/SchemaChangeCase.java
@@ -23,8 +23,6 @@ import lombok.Data;
@Data
@Builder
public class SchemaChangeCase {
-
- private String dbType;
private String driverUrl;
private String jdbcUrl;
private String driverClassName;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
index 1abcc9bbad..2f7dcb47cb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/modify_columns.sql
@@ -21,7 +21,7 @@
CREATE DATABASE IF NOT EXISTS `shop`;
use shop;
-alter table products modify name longtext null;
+alter table products modify name VARCHAR(400) null;
delete from products where id < 155;
insert into products
values (164,"scooter","Small 2-wheel scooter",3.14,1),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
index eb258d1840..0eb9d6d787 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/ddl/shop.sql
@@ -25,7 +25,7 @@ drop table if exists products;
-- Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ name VARCHAR(150) NOT NULL DEFAULT 'SeaTunnel',
description VARCHAR(512),
weight DECIMAL(8,2)
);
@@ -33,7 +33,7 @@ CREATE TABLE products (
drop table if exists mysql_cdc_e2e_sink_table_with_schema_change;
CREATE TABLE if not exists mysql_cdc_e2e_sink_table_with_schema_change (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ name VARCHAR(150) NOT NULL DEFAULT 'SeaTunnel',
description VARCHAR(512),
weight DECIMAL(8,2)
);
@@ -41,7 +41,7 @@ CREATE TABLE if not exists
mysql_cdc_e2e_sink_table_with_schema_change (
drop table if exists mysql_cdc_e2e_sink_table_with_schema_change_exactly_once;
CREATE TABLE if not exists
mysql_cdc_e2e_sink_table_with_schema_change_exactly_once (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
- name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ name VARCHAR(150) NOT NULL DEFAULT 'SeaTunnel',
description VARCHAR(512),
weight DECIMAL(8,2)
);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change.conf
new file mode 100644
index 0000000000..a0412fa9d0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:dm://e2e_dmdb:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ generate_sink_sql = true
+ database = "DAMENG"
+ table = "SYSDBA.sink_table_with_schema_change"
+ primary_keys = ["id"]
+
+ # Validate ddl update for sink writer multi replica
+ multi_table_sink_replica = 2
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change_exactly_once.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change_exactly_once.conf
new file mode 100644
index 0000000000..7065389f05
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-ddl/src/test/resources/mysqlcdc_to_dm_with_schema_change_exactly_once.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 5
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ MySQL-CDC {
+ server-id = 5652-5657
+ username = "st_user_source"
+ password = "mysqlpw"
+ table-names = ["shop.products"]
+ base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
+
+ schema-changes.enabled = true
+ }
+}
+
+sink {
+ jdbc {
+ url = "jdbc:dm://e2e_dmdb:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ database = "DAMENG"
+ generate_sink_sql = true
+ table = "SYSDBA.sink_table_with_schema_change_exactly_once"
+ primary_keys = ["id"]
+ xa_data_source_class_name = "dm.jdbc.driver.DmdbXADataSource"
+ }
+}