This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b87d732c81 [Improve][Jdbc] Support save mode for the sink of jdbc-dm 
(#7814)
b87d732c81 is described below

commit b87d732c8156b3dcfb0b3d96c75d616f4b72bc68
Author: dailai <[email protected]>
AuthorDate: Sun Oct 13 23:59:18 2024 +0800

    [Improve][Jdbc] Support save mode for the sink of jdbc-dm (#7814)
    
    Co-authored-by: dailai <[email protected]>
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  43 +++--
 .../seatunnel/jdbc/catalog/dm/DamengCatalog.java   |  72 +++----
 .../catalog/dm/DamengCreateTableSqlBuilder.java    | 211 +++++++++++++++++++++
 .../jdbc/internal/dialect/dm/DmdbDialect.java      |  10 +-
 .../seatunnel/jdbc/catalog/PreviewActionTest.java  |  40 ++--
 .../dm/DamengCreateTableSqlBuilderTest.java        | 137 +++++++++++++
 .../seatunnel/jdbc/catalog/dm/DamengJdbcTest.java  | 156 +++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcDmIT.java        |   8 +
 .../seatunnel/jdbc/JdbcDmSaveModeIT.java           |  86 +++++++++
 .../jdbc_dm_source_and_sink_savemode.conf          |  47 +++++
 10 files changed, 730 insertions(+), 80 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 772cc3bf77..4aa4ec778e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -178,24 +178,21 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
             DatabaseMetaData metaData = conn.getMetaData();
             Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, 
tablePath);
             List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, 
tablePath);
-            try (PreparedStatement ps = 
conn.prepareStatement(getSelectColumnsSql(tablePath));
-                    ResultSet resultSet = ps.executeQuery()) {
-
-                TableSchema.Builder builder = TableSchema.builder();
-                buildColumnsWithErrorCheck(tablePath, resultSet, builder);
-                // add primary key
-                primaryKey.ifPresent(builder::primaryKey);
-                // add constraint key
-                constraintKeys.forEach(builder::constraintKey);
-                TableIdentifier tableIdentifier = 
getTableIdentifier(tablePath);
-                return CatalogTable.of(
-                        tableIdentifier,
-                        builder.build(),
-                        buildConnectorOptions(tablePath),
-                        Collections.emptyList(),
-                        "",
-                        catalogName);
-            }
+            TableSchema.Builder tableSchemaBuilder =
+                    buildColumnsReturnTablaSchemaBuilder(tablePath, conn);
+            // add primary key
+            primaryKey.ifPresent(tableSchemaBuilder::primaryKey);
+            // add constraint key
+            constraintKeys.forEach(tableSchemaBuilder::constraintKey);
+            TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
+            return CatalogTable.of(
+                    tableIdentifier,
+                    tableSchemaBuilder.build(),
+                    buildConnectorOptions(tablePath),
+                    Collections.emptyList(),
+                    "",
+                    catalogName);
+
         } catch (SeaTunnelRuntimeException e) {
             throw e;
         } catch (Exception e) {
@@ -204,6 +201,16 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         }
     }
 
+    protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder(
+            TablePath tablePath, Connection conn) throws SQLException {
+        TableSchema.Builder columnsBuilder = TableSchema.builder();
+        try (PreparedStatement ps = 
conn.prepareStatement(getSelectColumnsSql(tablePath));
+                ResultSet resultSet = ps.executeQuery()) {
+            buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder);
+        }
+        return columnsBuilder;
+    }
+
     protected void buildColumnsWithErrorCheck(
             TablePath tablePath, ResultSet resultSet, TableSchema.Builder 
builder)
             throws SQLException {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index 13dcdf0783..04b15dfe1c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -21,6 +21,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
@@ -33,6 +34,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTy
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -42,18 +44,6 @@ import java.util.List;
 @Slf4j
 public class DamengCatalog extends AbstractJdbcCatalog {
 
-    private static final String SELECT_COLUMNS_SQL =
-            "SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE, 
COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
-                    + ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT, 
COMMENTS.COMMENTS "
-                    + "FROM ALL_TAB_COLUMNS COLUMNS "
-                    + "LEFT JOIN ALL_COL_COMMENTS COMMENTS "
-                    + "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME "
-                    + "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME "
-                    + "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME "
-                    + "WHERE COLUMNS.OWNER = '%s' "
-                    + "AND COLUMNS.TABLE_NAME = '%s' "
-                    + "ORDER BY COLUMNS.COLUMN_ID ASC";
-
     public DamengCatalog(
             String catalogName,
             String username,
@@ -85,7 +75,7 @@ public class DamengCatalog extends AbstractJdbcCatalog {
     @Override
     protected String getCreateTableSql(
             TablePath tablePath, CatalogTable table, boolean createIndex) {
-        throw new UnsupportedOperationException();
+        return new DamengCreateTableSqlBuilder(table, 
createIndex).build(tablePath);
     }
 
     @Override
@@ -95,7 +85,7 @@ public class DamengCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected String getTableName(TablePath tablePath) {
-        return tablePath.getSchemaAndTableName().toUpperCase();
+        return tablePath.getSchemaAndTableName("\"");
     }
 
     @Override
@@ -108,27 +98,19 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         return rs.getString(1) + "." + rs.getString(2);
     }
 
-    @Override
-    protected String getSelectColumnsSql(TablePath tablePath) {
-        return String.format(
-                SELECT_COLUMNS_SQL, tablePath.getSchemaName(), 
tablePath.getTableName());
-    }
-
     @Override
     protected Column buildColumn(ResultSet resultSet) throws SQLException {
         String columnName = resultSet.getString("COLUMN_NAME");
-        String typeName = resultSet.getString("DATA_TYPE");
-        long columnLength = resultSet.getLong("DATA_LENGTH");
-        long columnPrecision = resultSet.getLong("DATA_PRECISION");
-        int columnScale = resultSet.getInt("DATA_SCALE");
-        String columnComment = resultSet.getString("COMMENTS");
-        Object defaultValue = resultSet.getObject("DATA_DEFAULT");
-        boolean isNullable = resultSet.getString("NULLABLE").equals("Y");
-
+        String typeName = resultSet.getString("TYPE_NAME");
+        Long columnLength = resultSet.getLong("COLUMN_SIZE");
+        Long columnPrecision = columnLength;
+        Integer columnScale = resultSet.getObject("DECIMAL_DIGITS", 
Integer.class);
+        String columnComment = resultSet.getString("REMARKS");
+        Object defaultValue = resultSet.getObject("COLUMN_DEF");
+        boolean isNullable = (resultSet.getInt("NULLABLE") == 
DatabaseMetaData.columnNullable);
         BasicTypeDefine typeDefine =
                 BasicTypeDefine.builder()
                         .name(columnName)
-                        .columnType(typeName)
                         .dataType(typeName)
                         .length(columnLength)
                         .precision(columnPrecision)
@@ -150,11 +132,6 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         return tablePath.getSchemaAndTableName();
     }
 
-    private List<String> listTables() {
-        List<String> databases = listDatabases();
-        return listTables(databases.get(0));
-    }
-
     @Override
     public List<String> listTables(String databaseName)
             throws CatalogException, DatabaseNotExistException {
@@ -184,4 +161,31 @@ public class DamengCatalog extends AbstractJdbcCatalog {
         Connection defaultConnection = getConnection(defaultUrl);
         return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new 
DmdbTypeMapper());
     }
+
+    @Override
+    protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder(
+            TablePath tablePath, Connection conn) throws SQLException {
+        TableSchema.Builder columnsBuilder = TableSchema.builder();
+        DatabaseMetaData metaData = conn.getMetaData();
+        try (ResultSet resultSet =
+                metaData.getColumns(
+                        null, tablePath.getSchemaName(), 
tablePath.getTableName(), null)) {
+            buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder);
+        }
+        return columnsBuilder;
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) {
+        return String.format(
+                "TRUNCATE TABLE \"%s\".\"%s\"",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
+
+    @Override
+    protected String getExistDataSql(TablePath tablePath) {
+        return String.format(
+                "select * from \"%s\".\"%s\" WHERE rownum = 1",
+                tablePath.getSchemaName(), tablePath.getTableName());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..98e0361702
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCreateTableSqlBuilder;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class DamengCreateTableSqlBuilder extends 
AbstractJdbcCreateTableSqlBuilder {
+    private final List<Column> columns;
+    private final PrimaryKey primaryKey;
+    private final String sourceCatalogName;
+    private final String fieldIde;
+    private final List<ConstraintKey> constraintKeys;
+    private boolean createIndex;
+
+    public DamengCreateTableSqlBuilder(CatalogTable catalogTable, boolean 
createIndex) {
+        this.columns = catalogTable.getTableSchema().getColumns();
+        this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+        this.sourceCatalogName = catalogTable.getCatalogName();
+        this.fieldIde = catalogTable.getOptions().get("fieldIde");
+        constraintKeys = catalogTable.getTableSchema().getConstraintKeys();
+        this.createIndex = createIndex;
+    }
+
+    public String build(TablePath tablePath) {
+        StringBuilder createTableSql = new StringBuilder();
+        createTableSql
+                .append("CREATE TABLE ")
+                .append(tablePath.getSchemaAndTableName("\""))
+                .append(" (\n");
+
+        List<String> columnSqls =
+                columns.stream()
+                        .map(column -> 
CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
+                        .collect(Collectors.toList());
+
+        if (createIndex
+                && primaryKey != null
+                && CollectionUtils.isNotEmpty(primaryKey.getColumnNames())) {
+            columnSqls.add(buildPrimaryKeySql(primaryKey));
+        }
+
+        if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
+            for (ConstraintKey constraintKey : constraintKeys) {
+                if (StringUtils.isBlank(constraintKey.getConstraintName())
+                        || (primaryKey != null
+                                && (StringUtils.equals(
+                                                primaryKey.getPrimaryKey(),
+                                                
constraintKey.getConstraintName())
+                                        || primaryContainsAllConstrainKey(
+                                                primaryKey, constraintKey)))) {
+                    continue;
+                }
+                String constraintKeySql = buildConstraintKeySql(constraintKey);
+                if (StringUtils.isNotEmpty(constraintKeySql)) {
+                    columnSqls.add("\t" + constraintKeySql);
+                }
+            }
+        }
+
+        createTableSql.append(String.join(",\n", columnSqls));
+        createTableSql.append("\n)");
+
+        List<String> commentSqls =
+                columns.stream()
+                        .filter(column -> 
StringUtils.isNotBlank(column.getComment()))
+                        .map(
+                                column ->
+                                        buildColumnCommentSql(
+                                                column, 
tablePath.getSchemaAndTableName("\"")))
+                        .collect(Collectors.toList());
+
+        if (!commentSqls.isEmpty()) {
+            createTableSql.append(";\n");
+            createTableSql.append(String.join(";\n", commentSqls));
+            createTableSql.append(";");
+        }
+
+        return createTableSql.toString();
+    }
+
+    private String buildColumnSql(Column column) {
+        StringBuilder columnSql = new StringBuilder();
+        columnSql.append("\"").append(column.getName()).append("\" ");
+
+        String columnType =
+                StringUtils.equals(DatabaseIdentifier.DAMENG, 
sourceCatalogName)
+                        ? column.getSourceType()
+                        : 
DmdbTypeConverter.INSTANCE.reconvert(column).getColumnType();
+        columnSql.append(columnType);
+
+        if (!column.isNullable()) {
+            columnSql.append(" NOT NULL");
+        }
+
+        return columnSql.toString();
+    }
+
+    private String buildPrimaryKeySql(PrimaryKey primaryKey) {
+        String randomSuffix = UUID.randomUUID().toString().replace("-", 
"").substring(0, 4);
+        String columnNamesString =
+                primaryKey.getColumnNames().stream()
+                        .map(columnName -> "\"" + columnName + "\"")
+                        .collect(Collectors.joining(", "));
+
+        String primaryKeyStr = primaryKey.getPrimaryKey();
+        if (primaryKeyStr.length() > 25) {
+            primaryKeyStr = primaryKeyStr.substring(0, 25);
+        }
+
+        return CatalogUtils.getFieldIde(
+                "CONSTRAINT "
+                        + primaryKeyStr
+                        + "_"
+                        + randomSuffix
+                        + " PRIMARY KEY ("
+                        + columnNamesString
+                        + ")",
+                fieldIde);
+    }
+
+    private String buildColumnCommentSql(Column column, String tableName) {
+        StringBuilder columnCommentSql = new StringBuilder();
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", 
fieldIde))
+                .append(CatalogUtils.quoteIdentifier(tableName, fieldIde))
+                .append(".");
+        columnCommentSql
+                .append(CatalogUtils.quoteIdentifier(column.getName(), 
fieldIde, "\""))
+                .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+                .append(column.getComment())
+                .append("'");
+        return columnCommentSql.toString();
+    }
+
+    private String buildConstraintKeySql(ConstraintKey constraintKey) {
+        ConstraintKey.ConstraintType constraintType = 
constraintKey.getConstraintType();
+        String randomSuffix = UUID.randomUUID().toString().replace("-", 
"").substring(0, 4);
+
+        String constraintName = constraintKey.getConstraintName();
+        if (constraintName.length() > 25) {
+            constraintName = constraintName.substring(0, 25);
+        }
+        String indexColumns =
+                constraintKey.getColumnNames().stream()
+                        .map(
+                                constraintKeyColumn ->
+                                        String.format(
+                                                "\"%s\"",
+                                                CatalogUtils.getFieldIde(
+                                                        
constraintKeyColumn.getColumnName(),
+                                                        fieldIde)))
+                        .collect(Collectors.joining(", "));
+
+        String keyName;
+        switch (constraintType) {
+            case INDEX_KEY:
+                keyName = "KEY";
+                break;
+            case UNIQUE_KEY:
+                keyName = "UNIQUE";
+                break;
+            case FOREIGN_KEY:
+                keyName = "FOREIGN KEY";
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported constraint type: " + constraintType);
+        }
+
+        if (StringUtils.equals(keyName, "UNIQUE")) {
+            return "CONSTRAINT "
+                    + constraintName
+                    + "_"
+                    + randomSuffix
+                    + " UNIQUE ("
+                    + indexColumns
+                    + ")";
+        }
+        return null;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index da652d6b14..dd3965e843 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -94,12 +94,7 @@ public class DmdbDialect implements JdbcDialect {
         // If there is a schema in the sql of dm, an error will be reported.
         // This is compatible with the case that the schema is written or not 
written in the conf
         // configuration file
-        String databaseName =
-                database == null
-                        ? quoteIdentifier(tableName)
-                        : (tableName.contains(".")
-                                ? quoteIdentifier(tableName)
-                                : tableIdentifier(database, tableName));
+        String databaseName = tableIdentifier(database, tableName);
         String upsertSQL =
                 String.format(
                         " MERGE INTO %s TARGET"
@@ -137,6 +132,9 @@ public class DmdbDialect implements JdbcDialect {
     // Compatibility Both database = mode and table-names = schema.tableName 
are configured
     @Override
     public String tableIdentifier(String database, String tableName) {
+        if (database == null) {
+            return quoteIdentifier(tableName);
+        }
         if (tableName.contains(".")) {
             return quoteIdentifier(tableName);
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
index 5f4e239d6f..06d85551a1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
@@ -109,7 +109,7 @@ public class PreviewActionTest {
         DamengCatalogFactory factory = new DamengCatalogFactory();
         Catalog catalog =
                 factory.createCatalog(
-                        "test",
+                        "Dameng",
                         ReadonlyConfig.fromMap(
                                 new HashMap<String, Object>() {
                                     {
@@ -124,7 +124,7 @@ public class PreviewActionTest {
                         assertPreviewResult(
                                 catalog,
                                 Catalog.ActionType.CREATE_DATABASE,
-                                "CREATE DATABASE `testddatabase`;",
+                                "CREATE DATABASE \"testddatabase\";",
                                 Optional.empty()));
         Assertions.assertThrows(
                 UnsupportedOperationException.class,
@@ -132,28 +132,24 @@ public class PreviewActionTest {
                         assertPreviewResult(
                                 catalog,
                                 Catalog.ActionType.DROP_DATABASE,
-                                "DROP DATABASE `testddatabase`;",
-                                Optional.empty()));
-        Assertions.assertThrows(
-                UnsupportedOperationException.class,
-                () ->
-                        assertPreviewResult(
-                                catalog,
-                                Catalog.ActionType.TRUNCATE_TABLE,
-                                "TRUNCATE TABLE `testddatabase`.`testtable`;",
+                                "DROP DATABASE \"testddatabase\";",
                                 Optional.empty()));
         assertPreviewResult(
-                catalog, Catalog.ActionType.DROP_TABLE, "DROP TABLE 
TESTTABLE", Optional.empty());
-        Assertions.assertThrows(
-                UnsupportedOperationException.class,
-                () ->
-                        assertPreviewResult(
-                                catalog,
-                                Catalog.ActionType.CREATE_TABLE,
-                                "CREATE TABLE `testtable` (\n"
-                                        + "\t`test` LONGTEXT NULL COMMENT ''\n"
-                                        + ") COMMENT = 'comment';",
-                                Optional.of(CATALOG_TABLE)));
+                catalog,
+                Catalog.ActionType.TRUNCATE_TABLE,
+                "TRUNCATE TABLE \"null\".\"testtable\"",
+                Optional.empty());
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.DROP_TABLE,
+                "DROP TABLE \"testtable\"",
+                Optional.empty());
+
+        assertPreviewResult(
+                catalog,
+                Catalog.ActionType.CREATE_TABLE,
+                "CREATE TABLE \"testtable\" (\n" + "\"test\" TEXT\n" + ")",
+                Optional.of(CATALOG_TABLE));
     }
 
     @Test
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilderTest.java
new file mode 100644
index 0000000000..d845b8770e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilderTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class DamengCreateTableSqlBuilderTest {
+
+    @Test
+    public void TestCreateTableSqlBuilder() {
+        TablePath tablePath = TablePath.of("test_database", "test_schema", 
"test_table");
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .column(PhysicalColumn.of("id", BasicType.LONG_TYPE, 
22, false, null, "id"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "name", BasicType.STRING_TYPE, 128, 
false, null, "name"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "age", BasicType.INT_TYPE, (Long) 
null, true, null, "age"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "createTime",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        3,
+                                        true,
+                                        null,
+                                        "createTime"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "lastUpdateTime",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        3,
+                                        true,
+                                        null,
+                                        "lastUpdateTime"))
+                        .primaryKey(PrimaryKey.of("id", 
Lists.newArrayList("id")))
+                        .constraintKey(
+                                Arrays.asList(
+                                        ConstraintKey.of(
+                                                
ConstraintKey.ConstraintType.UNIQUE_KEY,
+                                                "name",
+                                                Lists.newArrayList(
+                                                        
ConstraintKey.ConstraintKeyColumn.of(
+                                                                "name", 
null))),
+                                        ConstraintKey.of(
+                                                
ConstraintKey.ConstraintType.INDEX_KEY,
+                                                "age",
+                                                Lists.newArrayList(
+                                                        
ConstraintKey.ConstraintKeyColumn.of(
+                                                                "age", 
null)))))
+                        .build();
+
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("test_catalog", tablePath),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "User table");
+
+        String createTableSql =
+                new DamengCreateTableSqlBuilder(catalogTable, 
true).build(tablePath);
+        String expect =
+                "CREATE TABLE \"test_schema\".\"test_table\" (\n"
+                        + "\"id\" BIGINT NOT NULL,\n"
+                        + "\"name\" VARCHAR2(128) NOT NULL,\n"
+                        + "\"age\" INT,\n"
+                        + "\"createTime\" TIMESTAMP,\n"
+                        + "\"lastUpdateTime\" TIMESTAMP,\n"
+                        + "CONSTRAINT id_63d5 PRIMARY KEY (\"id\"),\n"
+                        + "\tCONSTRAINT name_49b6 UNIQUE (\"name\")\n"
+                        + ");\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"id\" IS 'id';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"name\" IS 'name';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"age\" IS 'age';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"createTime\" IS 'createTime';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"lastUpdateTime\" IS 'lastUpdateTime';";
+
+        String regex1 = "id_\\w+";
+        String regex2 = "name_\\w+";
+        String replacedStr1 = createTableSql.replaceAll(regex1, 
"id_").replaceAll(regex2, "name_");
+        String replacedStr2 = expect.replaceAll(regex1, 
"id_").replaceAll(regex2, "name_");
+        Assertions.assertEquals(replacedStr2, replacedStr1);
+
+        // skip index
+        String createTableSqlSkipIndex =
+                new DamengCreateTableSqlBuilder(catalogTable, 
false).build(tablePath);
+        // create table sql is change; The old unit tests are no longer 
applicable
+        String expectSkipIndex =
+                "CREATE TABLE \"test_schema\".\"test_table\" (\n"
+                        + "\"id\" BIGINT NOT NULL,\n"
+                        + "\"name\" VARCHAR2(128) NOT NULL,\n"
+                        + "\"age\" INT,\n"
+                        + "\"createTime\" TIMESTAMP,\n"
+                        + "\"lastUpdateTime\" TIMESTAMP\n"
+                        + ");\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"id\" IS 'id';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"name\" IS 'name';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"age\" IS 'age';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"createTime\" IS 'createTime';\n"
+                        + "COMMENT ON COLUMN 
\"test_schema\".\"test_table\".\"lastUpdateTime\" IS 'lastUpdateTime';";
+        Assertions.assertEquals(expectSkipIndex, createTableSqlSkipIndex);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
new file mode 100644
index 0000000000..b0f6d42235
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Disabled("Please Test it in your local environment")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class DamengJdbcTest {
+
+    private static final JdbcUrlUtil.UrlInfo DM_URL_INFO =
+            JdbcUrlUtil.getUrlInfo("jdbc:dm://172.16.17.156:30236");
+
+    private static final String DATABASE_NAME = "DAMENG";
+    private static final String SCHEMA_NAME = "DM_USER01";
+    private static final String TABLE_NAME = "STUDENT_INFO";
+
+    private static final TablePath TABLE_PATH_DM =
+            TablePath.of(DATABASE_NAME, SCHEMA_NAME, TABLE_NAME);
+
+    private static DamengCatalog DAMENG_CATALOG;
+
+    private static CatalogTable DM_CATALOGTABLE;
+
+    @BeforeAll
+    static void before() {
+        DAMENG_CATALOG =
+                new DamengCatalog("DAMENG_CATALOG", "DM_USER01", "Te$Dt_1234", 
DM_URL_INFO, null);
+        DAMENG_CATALOG.open();
+    }
+
+    @Test
+    @Order(1)
+    void exists() {
+        Assertions.assertTrue(DAMENG_CATALOG.databaseExists(DATABASE_NAME));
+        Assertions.assertTrue(DAMENG_CATALOG.tableExists(TABLE_PATH_DM));
+    }
+
+    @Test
+    @Order(2)
+    void createTableInternal() {
+        Assertions.assertDoesNotThrow(
+                () -> DM_CATALOGTABLE = 
DAMENG_CATALOG.getTable(TABLE_PATH_DM));
+        Assertions.assertDoesNotThrow(
+                () ->
+                        DAMENG_CATALOG.createTable(
+                                TablePath.of(DATABASE_NAME, SCHEMA_NAME, 
TABLE_NAME + "_test"),
+                                DM_CATALOGTABLE,
+                                false,
+                                true));
+    }
+
+    @Test
+    @Order(3)
+    void dropTableInternal() {
+        Assertions.assertDoesNotThrow(
+                () ->
+                        DAMENG_CATALOG.dropTable(
+                                TablePath.of(DATABASE_NAME, SCHEMA_NAME, 
TABLE_NAME + "_test"),
+                                false));
+    }
+
+    @Test
+    @Order(4)
+    void createDatabaseInternal() {
+        Assertions.assertDoesNotThrow(() -> 
DAMENG_CATALOG.createDatabase(TABLE_PATH_DM, true));
+        Assertions.assertThrows(
+                DatabaseAlreadyExistException.class,
+                () -> DAMENG_CATALOG.createDatabase(TABLE_PATH_DM, false));
+        RuntimeException catalogException =
+                Assertions.assertThrows(
+                        RuntimeException.class,
+                        () ->
+                                DAMENG_CATALOG.createDatabase(
+                                        TablePath.of("test_db.test.test1"), 
true));
+        Assertions.assertInstanceOf(
+                UnsupportedOperationException.class, 
catalogException.getCause());
+        RuntimeException runtimeException =
+                Assertions.assertThrows(
+                        RuntimeException.class,
+                        () ->
+                                DAMENG_CATALOG.createDatabase(
+                                        TablePath.of("test_db.test.test1"), 
false));
+        Assertions.assertInstanceOf(
+                UnsupportedOperationException.class, 
runtimeException.getCause());
+    }
+
+    @Test
+    @Order(5)
+    void dropDatabaseInternal() {
+        Assertions.assertDoesNotThrow(
+                () -> 
DAMENG_CATALOG.dropDatabase(TablePath.of("test_db.test.test1"), true));
+        Assertions.assertThrows(
+                DatabaseNotExistException.class,
+                () -> 
DAMENG_CATALOG.dropDatabase(TablePath.of("test_db.test.test1"), false));
+        RuntimeException runtimeException =
+                Assertions.assertThrows(
+                        RuntimeException.class,
+                        () -> DAMENG_CATALOG.dropDatabase(TABLE_PATH_DM, 
true));
+        Assertions.assertInstanceOf(
+                UnsupportedOperationException.class, 
runtimeException.getCause());
+        RuntimeException catalogException =
+                Assertions.assertThrows(
+                        RuntimeException.class,
+                        () -> DAMENG_CATALOG.dropDatabase(TABLE_PATH_DM, 
false));
+        Assertions.assertInstanceOf(
+                UnsupportedOperationException.class, 
catalogException.getCause());
+    }
+
+    @Test
+    @Order(6)
+    void truncateTableInternal() {
+        Assertions.assertDoesNotThrow(() -> 
DAMENG_CATALOG.truncateTable(TABLE_PATH_DM, false));
+        Assertions.assertDoesNotThrow(() -> 
DAMENG_CATALOG.truncateTable(TABLE_PATH_DM, true));
+    }
+
+    @Test
+    @Order(7)
+    void listTablesInternal() {
+        Assertions.assertDoesNotThrow(() -> 
DAMENG_CATALOG.listTables(DATABASE_NAME));
+    }
+
+    @Test
+    @Order(8)
+    void existsData() {
+        Assertions.assertFalse(DAMENG_CATALOG.isExistsData(TABLE_PATH_DM));
+        
Assertions.assertTrue(DAMENG_CATALOG.isExistsData(TablePath.of("DAMENG.HIS.DEPARTMENTS")));
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
index 9bceb31a0d..e845bb1ebf 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
@@ -220,6 +220,14 @@ public class JdbcDmIT extends AbstractJdbcIT {
         return Pair.of(fieldNames, rows);
     }
 
+    protected String buildTableInfoWithSchema(String database, String schema, 
String table) {
+        return buildTableInfoWithSchema(schema, table);
+    }
+
+    protected void clearTable(String database, String schema, String table) {
+        clearTable(schema, table);
+    }
+
     @Override
     protected GenericContainer<?> initContainer() {
         GenericContainer<?> container =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmSaveModeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmSaveModeIT.java
new file mode 100644
index 0000000000..1a1f23ce58
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmSaveModeIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class JdbcDmSaveModeIT extends JdbcDmIT {
+
+    private static final String CREATE_SQL =
+            "create table if not exists %s"
+                    + "(\n"
+                    + "    DM_BIT              BIT,\n"
+                    + "    DM_INT              INT,\n"
+                    + "    DM_INTEGER          INTEGER,\n"
+                    + "    DM_PLS_INTEGER      PLS_INTEGER,\n"
+                    + "    DM_TINYINT          TINYINT,\n"
+                    + "\n"
+                    + "    DM_BYTE             BYTE,\n"
+                    + "    DM_SMALLINT         SMALLINT,\n"
+                    + "    DM_BIGINT           BIGINT,\n"
+                    + "\n"
+                    + "    DM_NUMERIC          NUMERIC,\n"
+                    + "    DM_NUMBER           NUMBER,\n"
+                    + "    DM_DECIMAL          DECIMAL,\n"
+                    + "    DM_DEC              DEC,\n"
+                    + "\n"
+                    + "    DM_REAL             REAL,\n"
+                    + "    DM_FLOAT            FLOAT,\n"
+                    + "    DM_DOUBLE_PRECISION DOUBLE PRECISION,\n"
+                    + "    DM_DOUBLE           DOUBLE,\n"
+                    + "\n"
+                    + "    DM_CHAR             CHAR,\n"
+                    + "    DM_CHARACTER        CHARACTER,\n"
+                    + "    DM_VARCHAR          VARCHAR,\n"
+                    + "    DM_VARCHAR2         VARCHAR2,\n"
+                    + "    DM_TEXT             TEXT,\n"
+                    + "    DM_LONG             LONG,\n"
+                    + "    DM_LONGVARCHAR      LONGVARCHAR,\n"
+                    + "    DM_CLOB             CLOB,\n"
+                    + "\n"
+                    + "    DM_TIMESTAMP        TIMESTAMP,\n"
+                    + "    DM_DATETIME         DATETIME,\n"
+                    + "    DM_DATE             DATE,\n"
+                    + "\n"
+                    + "    DM_BLOB             BLOB,\n"
+                    + "    DM_BINARY           BINARY,\n"
+                    + "    DM_VARBINARY        VARBINARY,\n"
+                    + "    DM_LONGVARBINARY    LONGVARBINARY,\n"
+                    + "    DM_IMAGE            IMAGE,\n"
+                    + "    DM_BFILE            BFILE,\n"
+                    + "    constraint PK_T_COL primary key (\"DM_INT\")"
+                    + ")";
+
+    private static final String DM_SINK = "e2e_table_sink1";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_dm_source_and_sink_savemode.conf");
+
+    @Override
+    JdbcCase getJdbcCase() {
+        JdbcCase jdbcCase = super.getJdbcCase();
+        jdbcCase.setUseSaveModeCreateTable(true);
+        jdbcCase.setSinkTable(DM_SINK);
+        jdbcCase.setConfigFile(CONFIG_FILE);
+        jdbcCase.setCreateSql(CREATE_SQL);
+        return jdbcCase;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_dm_source_and_sink_savemode.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_dm_source_and_sink_savemode.conf
new file mode 100644
index 0000000000..e9d2c00f76
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_dm_source_and_sink_savemode.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    url = "jdbc:dm://e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    query = """select * from "SYSDBA".e2e_table_source"""
+  }
+
+}
+
+sink {
+  Jdbc {
+    url = "jdbc:dm://e2e_dmdb:5236"
+    driver = "dm.jdbc.driver.DmDriver"
+    connection_check_timeout_sec = 1000
+    user = "SYSDBA"
+    password = "SYSDBA"
+    database = "DAMENG"
+    table = "SYSDBA.e2e_table_sink1"
+    generate_sink_sql = true
+  }
+}
+


Reply via email to