This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b87d732c81 [Improve][Jdbc] Support save mode for the sink of jdbc-dm
(#7814)
b87d732c81 is described below
commit b87d732c8156b3dcfb0b3d96c75d616f4b72bc68
Author: dailai <[email protected]>
AuthorDate: Sun Oct 13 23:59:18 2024 +0800
[Improve][Jdbc] Support save mode for the sink of jdbc-dm (#7814)
Co-authored-by: dailai <[email protected]>
---
.../jdbc/catalog/AbstractJdbcCatalog.java | 43 +++--
.../seatunnel/jdbc/catalog/dm/DamengCatalog.java | 72 +++----
.../catalog/dm/DamengCreateTableSqlBuilder.java | 211 +++++++++++++++++++++
.../jdbc/internal/dialect/dm/DmdbDialect.java | 10 +-
.../seatunnel/jdbc/catalog/PreviewActionTest.java | 40 ++--
.../dm/DamengCreateTableSqlBuilderTest.java | 137 +++++++++++++
.../seatunnel/jdbc/catalog/dm/DamengJdbcTest.java | 156 +++++++++++++++
.../connectors/seatunnel/jdbc/JdbcDmIT.java | 8 +
.../seatunnel/jdbc/JdbcDmSaveModeIT.java | 86 +++++++++
.../jdbc_dm_source_and_sink_savemode.conf | 47 +++++
10 files changed, 730 insertions(+), 80 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 772cc3bf77..4aa4ec778e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -178,24 +178,21 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData,
tablePath);
List<ConstraintKey> constraintKeys = getConstraintKeys(metaData,
tablePath);
- try (PreparedStatement ps =
conn.prepareStatement(getSelectColumnsSql(tablePath));
- ResultSet resultSet = ps.executeQuery()) {
-
- TableSchema.Builder builder = TableSchema.builder();
- buildColumnsWithErrorCheck(tablePath, resultSet, builder);
- // add primary key
- primaryKey.ifPresent(builder::primaryKey);
- // add constraint key
- constraintKeys.forEach(builder::constraintKey);
- TableIdentifier tableIdentifier =
getTableIdentifier(tablePath);
- return CatalogTable.of(
- tableIdentifier,
- builder.build(),
- buildConnectorOptions(tablePath),
- Collections.emptyList(),
- "",
- catalogName);
- }
+ TableSchema.Builder tableSchemaBuilder =
+ buildColumnsReturnTablaSchemaBuilder(tablePath, conn);
+ // add primary key
+ primaryKey.ifPresent(tableSchemaBuilder::primaryKey);
+ // add constraint key
+ constraintKeys.forEach(tableSchemaBuilder::constraintKey);
+ TableIdentifier tableIdentifier = getTableIdentifier(tablePath);
+ return CatalogTable.of(
+ tableIdentifier,
+ tableSchemaBuilder.build(),
+ buildConnectorOptions(tablePath),
+ Collections.emptyList(),
+ "",
+ catalogName);
+
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
@@ -204,6 +201,16 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
}
}
+ protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder(
+ TablePath tablePath, Connection conn) throws SQLException {
+ TableSchema.Builder columnsBuilder = TableSchema.builder();
+ try (PreparedStatement ps =
conn.prepareStatement(getSelectColumnsSql(tablePath));
+ ResultSet resultSet = ps.executeQuery()) {
+ buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder);
+ }
+ return columnsBuilder;
+ }
+
protected void buildColumnsWithErrorCheck(
TablePath tablePath, ResultSet resultSet, TableSchema.Builder
builder)
throws SQLException {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
index 13dcdf0783..04b15dfe1c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
@@ -21,6 +21,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
@@ -33,6 +34,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTy
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -42,18 +44,6 @@ import java.util.List;
@Slf4j
public class DamengCatalog extends AbstractJdbcCatalog {
- private static final String SELECT_COLUMNS_SQL =
- "SELECT COLUMNS.COLUMN_NAME, COLUMNS.DATA_TYPE,
COLUMNS.DATA_LENGTH, COLUMNS.DATA_PRECISION, COLUMNS.DATA_SCALE "
- + ", COLUMNS.NULLABLE, COLUMNS.DATA_DEFAULT,
COMMENTS.COMMENTS "
- + "FROM ALL_TAB_COLUMNS COLUMNS "
- + "LEFT JOIN ALL_COL_COMMENTS COMMENTS "
- + "ON COLUMNS.OWNER = COMMENTS.SCHEMA_NAME "
- + "AND COLUMNS.TABLE_NAME = COMMENTS.TABLE_NAME "
- + "AND COLUMNS.COLUMN_NAME = COMMENTS.COLUMN_NAME "
- + "WHERE COLUMNS.OWNER = '%s' "
- + "AND COLUMNS.TABLE_NAME = '%s' "
- + "ORDER BY COLUMNS.COLUMN_ID ASC";
-
public DamengCatalog(
String catalogName,
String username,
@@ -85,7 +75,7 @@ public class DamengCatalog extends AbstractJdbcCatalog {
@Override
protected String getCreateTableSql(
TablePath tablePath, CatalogTable table, boolean createIndex) {
- throw new UnsupportedOperationException();
+ return new DamengCreateTableSqlBuilder(table,
createIndex).build(tablePath);
}
@Override
@@ -95,7 +85,7 @@ public class DamengCatalog extends AbstractJdbcCatalog {
@Override
protected String getTableName(TablePath tablePath) {
- return tablePath.getSchemaAndTableName().toUpperCase();
+ return tablePath.getSchemaAndTableName("\"");
}
@Override
@@ -108,27 +98,19 @@ public class DamengCatalog extends AbstractJdbcCatalog {
return rs.getString(1) + "." + rs.getString(2);
}
- @Override
- protected String getSelectColumnsSql(TablePath tablePath) {
- return String.format(
- SELECT_COLUMNS_SQL, tablePath.getSchemaName(),
tablePath.getTableName());
- }
-
@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
- String typeName = resultSet.getString("DATA_TYPE");
- long columnLength = resultSet.getLong("DATA_LENGTH");
- long columnPrecision = resultSet.getLong("DATA_PRECISION");
- int columnScale = resultSet.getInt("DATA_SCALE");
- String columnComment = resultSet.getString("COMMENTS");
- Object defaultValue = resultSet.getObject("DATA_DEFAULT");
- boolean isNullable = resultSet.getString("NULLABLE").equals("Y");
-
+ String typeName = resultSet.getString("TYPE_NAME");
+ Long columnLength = resultSet.getLong("COLUMN_SIZE");
+ Long columnPrecision = columnLength;
+ Integer columnScale = resultSet.getObject("DECIMAL_DIGITS",
Integer.class);
+ String columnComment = resultSet.getString("REMARKS");
+ Object defaultValue = resultSet.getObject("COLUMN_DEF");
+ boolean isNullable = (resultSet.getInt("NULLABLE") ==
DatabaseMetaData.columnNullable);
BasicTypeDefine typeDefine =
BasicTypeDefine.builder()
.name(columnName)
- .columnType(typeName)
.dataType(typeName)
.length(columnLength)
.precision(columnPrecision)
@@ -150,11 +132,6 @@ public class DamengCatalog extends AbstractJdbcCatalog {
return tablePath.getSchemaAndTableName();
}
- private List<String> listTables() {
- List<String> databases = listDatabases();
- return listTables(databases.get(0));
- }
-
@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
@@ -184,4 +161,31 @@ public class DamengCatalog extends AbstractJdbcCatalog {
Connection defaultConnection = getConnection(defaultUrl);
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new
DmdbTypeMapper());
}
+
+ @Override
+ protected TableSchema.Builder buildColumnsReturnTablaSchemaBuilder(
+ TablePath tablePath, Connection conn) throws SQLException {
+ TableSchema.Builder columnsBuilder = TableSchema.builder();
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet resultSet =
+ metaData.getColumns(
+ null, tablePath.getSchemaName(),
tablePath.getTableName(), null)) {
+ buildColumnsWithErrorCheck(tablePath, resultSet, columnsBuilder);
+ }
+ return columnsBuilder;
+ }
+
+ @Override
+ protected String getTruncateTableSql(TablePath tablePath) {
+ return String.format(
+ "TRUNCATE TABLE \"%s\".\"%s\"",
+ tablePath.getSchemaName(), tablePath.getTableName());
+ }
+
+ @Override
+ protected String getExistDataSql(TablePath tablePath) {
+ return String.format(
+ "select * from \"%s\".\"%s\" WHERE rownum = 1",
+ tablePath.getSchemaName(), tablePath.getTableName());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilder.java
new file mode 100644
index 0000000000..98e0361702
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilder.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCreateTableSqlBuilder;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class DamengCreateTableSqlBuilder extends
AbstractJdbcCreateTableSqlBuilder {
+ private final List<Column> columns;
+ private final PrimaryKey primaryKey;
+ private final String sourceCatalogName;
+ private final String fieldIde;
+ private final List<ConstraintKey> constraintKeys;
+ private boolean createIndex;
+
+ public DamengCreateTableSqlBuilder(CatalogTable catalogTable, boolean
createIndex) {
+ this.columns = catalogTable.getTableSchema().getColumns();
+ this.primaryKey = catalogTable.getTableSchema().getPrimaryKey();
+ this.sourceCatalogName = catalogTable.getCatalogName();
+ this.fieldIde = catalogTable.getOptions().get("fieldIde");
+ constraintKeys = catalogTable.getTableSchema().getConstraintKeys();
+ this.createIndex = createIndex;
+ }
+
+ public String build(TablePath tablePath) {
+ StringBuilder createTableSql = new StringBuilder();
+ createTableSql
+ .append("CREATE TABLE ")
+ .append(tablePath.getSchemaAndTableName("\""))
+ .append(" (\n");
+
+ List<String> columnSqls =
+ columns.stream()
+ .map(column ->
CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde))
+ .collect(Collectors.toList());
+
+ if (createIndex
+ && primaryKey != null
+ && CollectionUtils.isNotEmpty(primaryKey.getColumnNames())) {
+ columnSqls.add(buildPrimaryKeySql(primaryKey));
+ }
+
+ if (createIndex && CollectionUtils.isNotEmpty(constraintKeys)) {
+ for (ConstraintKey constraintKey : constraintKeys) {
+ if (StringUtils.isBlank(constraintKey.getConstraintName())
+ || (primaryKey != null
+ && (StringUtils.equals(
+ primaryKey.getPrimaryKey(),
+
constraintKey.getConstraintName())
+ || primaryContainsAllConstrainKey(
+ primaryKey, constraintKey)))) {
+ continue;
+ }
+ String constraintKeySql = buildConstraintKeySql(constraintKey);
+ if (StringUtils.isNotEmpty(constraintKeySql)) {
+ columnSqls.add("\t" + constraintKeySql);
+ }
+ }
+ }
+
+ createTableSql.append(String.join(",\n", columnSqls));
+ createTableSql.append("\n)");
+
+ List<String> commentSqls =
+ columns.stream()
+ .filter(column ->
StringUtils.isNotBlank(column.getComment()))
+ .map(
+ column ->
+ buildColumnCommentSql(
+ column,
tablePath.getSchemaAndTableName("\"")))
+ .collect(Collectors.toList());
+
+ if (!commentSqls.isEmpty()) {
+ createTableSql.append(";\n");
+ createTableSql.append(String.join(";\n", commentSqls));
+ createTableSql.append(";");
+ }
+
+ return createTableSql.toString();
+ }
+
+ private String buildColumnSql(Column column) {
+ StringBuilder columnSql = new StringBuilder();
+ columnSql.append("\"").append(column.getName()).append("\" ");
+
+ String columnType =
+ StringUtils.equals(DatabaseIdentifier.DAMENG,
sourceCatalogName)
+ ? column.getSourceType()
+ :
DmdbTypeConverter.INSTANCE.reconvert(column).getColumnType();
+ columnSql.append(columnType);
+
+ if (!column.isNullable()) {
+ columnSql.append(" NOT NULL");
+ }
+
+ return columnSql.toString();
+ }
+
+ private String buildPrimaryKeySql(PrimaryKey primaryKey) {
+ String randomSuffix = UUID.randomUUID().toString().replace("-",
"").substring(0, 4);
+ String columnNamesString =
+ primaryKey.getColumnNames().stream()
+ .map(columnName -> "\"" + columnName + "\"")
+ .collect(Collectors.joining(", "));
+
+ String primaryKeyStr = primaryKey.getPrimaryKey();
+ if (primaryKeyStr.length() > 25) {
+ primaryKeyStr = primaryKeyStr.substring(0, 25);
+ }
+
+ return CatalogUtils.getFieldIde(
+ "CONSTRAINT "
+ + primaryKeyStr
+ + "_"
+ + randomSuffix
+ + " PRIMARY KEY ("
+ + columnNamesString
+ + ")",
+ fieldIde);
+ }
+
+ private String buildColumnCommentSql(Column column, String tableName) {
+ StringBuilder columnCommentSql = new StringBuilder();
+ columnCommentSql
+ .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ",
fieldIde))
+ .append(CatalogUtils.quoteIdentifier(tableName, fieldIde))
+ .append(".");
+ columnCommentSql
+ .append(CatalogUtils.quoteIdentifier(column.getName(),
fieldIde, "\""))
+ .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde))
+ .append(column.getComment())
+ .append("'");
+ return columnCommentSql.toString();
+ }
+
+ private String buildConstraintKeySql(ConstraintKey constraintKey) {
+ ConstraintKey.ConstraintType constraintType =
constraintKey.getConstraintType();
+ String randomSuffix = UUID.randomUUID().toString().replace("-",
"").substring(0, 4);
+
+ String constraintName = constraintKey.getConstraintName();
+ if (constraintName.length() > 25) {
+ constraintName = constraintName.substring(0, 25);
+ }
+ String indexColumns =
+ constraintKey.getColumnNames().stream()
+ .map(
+ constraintKeyColumn ->
+ String.format(
+ "\"%s\"",
+ CatalogUtils.getFieldIde(
+
constraintKeyColumn.getColumnName(),
+ fieldIde)))
+ .collect(Collectors.joining(", "));
+
+ String keyName;
+ switch (constraintType) {
+ case INDEX_KEY:
+ keyName = "KEY";
+ break;
+ case UNIQUE_KEY:
+ keyName = "UNIQUE";
+ break;
+ case FOREIGN_KEY:
+ keyName = "FOREIGN KEY";
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported constraint type: " + constraintType);
+ }
+
+ if (StringUtils.equals(keyName, "UNIQUE")) {
+ return "CONSTRAINT "
+ + constraintName
+ + "_"
+ + randomSuffix
+ + " UNIQUE ("
+ + indexColumns
+ + ")";
+ }
+ return null;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
index da652d6b14..dd3965e843 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialect.java
@@ -94,12 +94,7 @@ public class DmdbDialect implements JdbcDialect {
// If there is a schema in the sql of dm, an error will be reported.
// This is compatible with the case that the schema is written or not
written in the conf
// configuration file
- String databaseName =
- database == null
- ? quoteIdentifier(tableName)
- : (tableName.contains(".")
- ? quoteIdentifier(tableName)
- : tableIdentifier(database, tableName));
+ String databaseName = tableIdentifier(database, tableName);
String upsertSQL =
String.format(
" MERGE INTO %s TARGET"
@@ -137,6 +132,9 @@ public class DmdbDialect implements JdbcDialect {
// Compatibility Both database = mode and table-names = schema.tableName
are configured
@Override
public String tableIdentifier(String database, String tableName) {
+ if (database == null) {
+ return quoteIdentifier(tableName);
+ }
if (tableName.contains(".")) {
return quoteIdentifier(tableName);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
index 5f4e239d6f..06d85551a1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/PreviewActionTest.java
@@ -109,7 +109,7 @@ public class PreviewActionTest {
DamengCatalogFactory factory = new DamengCatalogFactory();
Catalog catalog =
factory.createCatalog(
- "test",
+ "Dameng",
ReadonlyConfig.fromMap(
new HashMap<String, Object>() {
{
@@ -124,7 +124,7 @@ public class PreviewActionTest {
assertPreviewResult(
catalog,
Catalog.ActionType.CREATE_DATABASE,
- "CREATE DATABASE `testddatabase`;",
+ "CREATE DATABASE \"testddatabase\";",
Optional.empty()));
Assertions.assertThrows(
UnsupportedOperationException.class,
@@ -132,28 +132,24 @@ public class PreviewActionTest {
assertPreviewResult(
catalog,
Catalog.ActionType.DROP_DATABASE,
- "DROP DATABASE `testddatabase`;",
- Optional.empty()));
- Assertions.assertThrows(
- UnsupportedOperationException.class,
- () ->
- assertPreviewResult(
- catalog,
- Catalog.ActionType.TRUNCATE_TABLE,
- "TRUNCATE TABLE `testddatabase`.`testtable`;",
+ "DROP DATABASE \"testddatabase\";",
Optional.empty()));
assertPreviewResult(
- catalog, Catalog.ActionType.DROP_TABLE, "DROP TABLE
TESTTABLE", Optional.empty());
- Assertions.assertThrows(
- UnsupportedOperationException.class,
- () ->
- assertPreviewResult(
- catalog,
- Catalog.ActionType.CREATE_TABLE,
- "CREATE TABLE `testtable` (\n"
- + "\t`test` LONGTEXT NULL COMMENT ''\n"
- + ") COMMENT = 'comment';",
- Optional.of(CATALOG_TABLE)));
+ catalog,
+ Catalog.ActionType.TRUNCATE_TABLE,
+ "TRUNCATE TABLE \"null\".\"testtable\"",
+ Optional.empty());
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.DROP_TABLE,
+ "DROP TABLE \"testtable\"",
+ Optional.empty());
+
+ assertPreviewResult(
+ catalog,
+ Catalog.ActionType.CREATE_TABLE,
+ "CREATE TABLE \"testtable\" (\n" + "\"test\" TEXT\n" + ")",
+ Optional.of(CATALOG_TABLE));
}
@Test
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilderTest.java
new file mode 100644
index 0000000000..d845b8770e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCreateTableSqlBuilderTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class DamengCreateTableSqlBuilderTest {
+
+ @Test
+ public void TestCreateTableSqlBuilder() {
+ TablePath tablePath = TablePath.of("test_database", "test_schema",
"test_table");
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .column(PhysicalColumn.of("id", BasicType.LONG_TYPE,
22, false, null, "id"))
+ .column(
+ PhysicalColumn.of(
+ "name", BasicType.STRING_TYPE, 128,
false, null, "name"))
+ .column(
+ PhysicalColumn.of(
+ "age", BasicType.INT_TYPE, (Long)
null, true, null, "age"))
+ .column(
+ PhysicalColumn.of(
+ "createTime",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 3,
+ true,
+ null,
+ "createTime"))
+ .column(
+ PhysicalColumn.of(
+ "lastUpdateTime",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ 3,
+ true,
+ null,
+ "lastUpdateTime"))
+ .primaryKey(PrimaryKey.of("id",
Lists.newArrayList("id")))
+ .constraintKey(
+ Arrays.asList(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.UNIQUE_KEY,
+ "name",
+ Lists.newArrayList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "name",
null))),
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.INDEX_KEY,
+ "age",
+ Lists.newArrayList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "age",
null)))))
+ .build();
+
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("test_catalog", tablePath),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "User table");
+
+ String createTableSql =
+ new DamengCreateTableSqlBuilder(catalogTable,
true).build(tablePath);
+ String expect =
+ "CREATE TABLE \"test_schema\".\"test_table\" (\n"
+ + "\"id\" BIGINT NOT NULL,\n"
+ + "\"name\" VARCHAR2(128) NOT NULL,\n"
+ + "\"age\" INT,\n"
+ + "\"createTime\" TIMESTAMP,\n"
+ + "\"lastUpdateTime\" TIMESTAMP,\n"
+ + "CONSTRAINT id_63d5 PRIMARY KEY (\"id\"),\n"
+ + "\tCONSTRAINT name_49b6 UNIQUE (\"name\")\n"
+ + ");\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"id\" IS 'id';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"name\" IS 'name';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"age\" IS 'age';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"createTime\" IS 'createTime';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"lastUpdateTime\" IS 'lastUpdateTime';";
+
+ String regex1 = "id_\\w+";
+ String regex2 = "name_\\w+";
+ String replacedStr1 = createTableSql.replaceAll(regex1,
"id_").replaceAll(regex2, "name_");
+ String replacedStr2 = expect.replaceAll(regex1,
"id_").replaceAll(regex2, "name_");
+ Assertions.assertEquals(replacedStr2, replacedStr1);
+
+ // skip index
+ String createTableSqlSkipIndex =
+ new DamengCreateTableSqlBuilder(catalogTable,
false).build(tablePath);
+ // create table sql is change; The old unit tests are no longer
applicable
+ String expectSkipIndex =
+ "CREATE TABLE \"test_schema\".\"test_table\" (\n"
+ + "\"id\" BIGINT NOT NULL,\n"
+ + "\"name\" VARCHAR2(128) NOT NULL,\n"
+ + "\"age\" INT,\n"
+ + "\"createTime\" TIMESTAMP,\n"
+ + "\"lastUpdateTime\" TIMESTAMP\n"
+ + ");\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"id\" IS 'id';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"name\" IS 'name';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"age\" IS 'age';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"createTime\" IS 'createTime';\n"
+ + "COMMENT ON COLUMN
\"test_schema\".\"test_table\".\"lastUpdateTime\" IS 'lastUpdateTime';";
+ Assertions.assertEquals(expectSkipIndex, createTableSqlSkipIndex);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
new file mode 100644
index 0000000000..b0f6d42235
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengJdbcTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.dm;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+@Disabled("Please Test it in your local environment")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class DamengJdbcTest {
+
+ private static final JdbcUrlUtil.UrlInfo DM_URL_INFO =
+ JdbcUrlUtil.getUrlInfo("jdbc:dm://172.16.17.156:30236");
+
+ private static final String DATABASE_NAME = "DAMENG";
+ private static final String SCHEMA_NAME = "DM_USER01";
+ private static final String TABLE_NAME = "STUDENT_INFO";
+
+ private static final TablePath TABLE_PATH_DM =
+ TablePath.of(DATABASE_NAME, SCHEMA_NAME, TABLE_NAME);
+
+ private static DamengCatalog DAMENG_CATALOG;
+
+ private static CatalogTable DM_CATALOGTABLE;
+
+ @BeforeAll
+ static void before() {
+ DAMENG_CATALOG =
+ new DamengCatalog("DAMENG_CATALOG", "DM_USER01", "Te$Dt_1234",
DM_URL_INFO, null);
+ DAMENG_CATALOG.open();
+ }
+
+ @Test
+ @Order(1)
+ void exists() {
+ Assertions.assertTrue(DAMENG_CATALOG.databaseExists(DATABASE_NAME));
+ Assertions.assertTrue(DAMENG_CATALOG.tableExists(TABLE_PATH_DM));
+ }
+
+ @Test
+ @Order(2)
+ void createTableInternal() {
+ Assertions.assertDoesNotThrow(
+ () -> DM_CATALOGTABLE =
DAMENG_CATALOG.getTable(TABLE_PATH_DM));
+ Assertions.assertDoesNotThrow(
+ () ->
+ DAMENG_CATALOG.createTable(
+ TablePath.of(DATABASE_NAME, SCHEMA_NAME,
TABLE_NAME + "_test"),
+ DM_CATALOGTABLE,
+ false,
+ true));
+ }
+
+ @Test
+ @Order(3)
+ void dropTableInternal() {
+ Assertions.assertDoesNotThrow(
+ () ->
+ DAMENG_CATALOG.dropTable(
+ TablePath.of(DATABASE_NAME, SCHEMA_NAME,
TABLE_NAME + "_test"),
+ false));
+ }
+
+ @Test
+ @Order(4)
+ void createDatabaseInternal() {
+ Assertions.assertDoesNotThrow(() ->
DAMENG_CATALOG.createDatabase(TABLE_PATH_DM, true));
+ Assertions.assertThrows(
+ DatabaseAlreadyExistException.class,
+ () -> DAMENG_CATALOG.createDatabase(TABLE_PATH_DM, false));
+ RuntimeException catalogException =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () ->
+ DAMENG_CATALOG.createDatabase(
+ TablePath.of("test_db.test.test1"),
true));
+ Assertions.assertInstanceOf(
+ UnsupportedOperationException.class,
catalogException.getCause());
+ RuntimeException runtimeException =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () ->
+ DAMENG_CATALOG.createDatabase(
+ TablePath.of("test_db.test.test1"),
false));
+ Assertions.assertInstanceOf(
+ UnsupportedOperationException.class,
runtimeException.getCause());
+ }
+
+ @Test
+ @Order(5)
+ void dropDatabaseInternal() {
+ Assertions.assertDoesNotThrow(
+ () ->
DAMENG_CATALOG.dropDatabase(TablePath.of("test_db.test.test1"), true));
+ Assertions.assertThrows(
+ DatabaseNotExistException.class,
+ () ->
DAMENG_CATALOG.dropDatabase(TablePath.of("test_db.test.test1"), false));
+ RuntimeException runtimeException =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> DAMENG_CATALOG.dropDatabase(TABLE_PATH_DM,
true));
+ Assertions.assertInstanceOf(
+ UnsupportedOperationException.class,
runtimeException.getCause());
+ RuntimeException catalogException =
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> DAMENG_CATALOG.dropDatabase(TABLE_PATH_DM,
false));
+ Assertions.assertInstanceOf(
+ UnsupportedOperationException.class,
catalogException.getCause());
+ }
+
+ @Test
+ @Order(6)
+ void truncateTableInternal() {
+ Assertions.assertDoesNotThrow(() ->
DAMENG_CATALOG.truncateTable(TABLE_PATH_DM, false));
+ Assertions.assertDoesNotThrow(() ->
DAMENG_CATALOG.truncateTable(TABLE_PATH_DM, true));
+ }
+
+ @Test
+ @Order(7)
+ void listTablesInternal() {
+ Assertions.assertDoesNotThrow(() ->
DAMENG_CATALOG.listTables(DATABASE_NAME));
+ }
+
+ @Test
+ @Order(8)
+ void existsData() {
+ Assertions.assertFalse(DAMENG_CATALOG.isExistsData(TABLE_PATH_DM));
+
Assertions.assertTrue(DAMENG_CATALOG.isExistsData(TablePath.of("DAMENG.HIS.DEPARTMENTS")));
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
index 9bceb31a0d..e845bb1ebf 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmIT.java
@@ -220,6 +220,14 @@ public class JdbcDmIT extends AbstractJdbcIT {
return Pair.of(fieldNames, rows);
}
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(schema, table);
+ }
+
@Override
protected GenericContainer<?> initContainer() {
GenericContainer<?> container =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmSaveModeIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmSaveModeIT.java
new file mode 100644
index 0000000000..1a1f23ce58
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmSaveModeIT.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class JdbcDmSaveModeIT extends JdbcDmIT {
+
+ private static final String CREATE_SQL =
+ "create table if not exists %s"
+ + "(\n"
+ + " DM_BIT BIT,\n"
+ + " DM_INT INT,\n"
+ + " DM_INTEGER INTEGER,\n"
+ + " DM_PLS_INTEGER PLS_INTEGER,\n"
+ + " DM_TINYINT TINYINT,\n"
+ + "\n"
+ + " DM_BYTE BYTE,\n"
+ + " DM_SMALLINT SMALLINT,\n"
+ + " DM_BIGINT BIGINT,\n"
+ + "\n"
+ + " DM_NUMERIC NUMERIC,\n"
+ + " DM_NUMBER NUMBER,\n"
+ + " DM_DECIMAL DECIMAL,\n"
+ + " DM_DEC DEC,\n"
+ + "\n"
+ + " DM_REAL REAL,\n"
+ + " DM_FLOAT FLOAT,\n"
+ + " DM_DOUBLE_PRECISION DOUBLE PRECISION,\n"
+ + " DM_DOUBLE DOUBLE,\n"
+ + "\n"
+ + " DM_CHAR CHAR,\n"
+ + " DM_CHARACTER CHARACTER,\n"
+ + " DM_VARCHAR VARCHAR,\n"
+ + " DM_VARCHAR2 VARCHAR2,\n"
+ + " DM_TEXT TEXT,\n"
+ + " DM_LONG LONG,\n"
+ + " DM_LONGVARCHAR LONGVARCHAR,\n"
+ + " DM_CLOB CLOB,\n"
+ + "\n"
+ + " DM_TIMESTAMP TIMESTAMP,\n"
+ + " DM_DATETIME DATETIME,\n"
+ + " DM_DATE DATE,\n"
+ + "\n"
+ + " DM_BLOB BLOB,\n"
+ + " DM_BINARY BINARY,\n"
+ + " DM_VARBINARY VARBINARY,\n"
+ + " DM_LONGVARBINARY LONGVARBINARY,\n"
+ + " DM_IMAGE IMAGE,\n"
+ + " DM_BFILE BFILE,\n"
+ + " constraint PK_T_COL primary key (\"DM_INT\")"
+ + ")";
+
+ private static final String DM_SINK = "e2e_table_sink1";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_dm_source_and_sink_savemode.conf");
+
+ @Override
+ JdbcCase getJdbcCase() {
+ JdbcCase jdbcCase = super.getJdbcCase();
+ jdbcCase.setUseSaveModeCreateTable(true);
+ jdbcCase.setSinkTable(DM_SINK);
+ jdbcCase.setConfigFile(CONFIG_FILE);
+ jdbcCase.setCreateSql(CREATE_SQL);
+ return jdbcCase;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_dm_source_and_sink_savemode.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_dm_source_and_sink_savemode.conf
new file mode 100644
index 0000000000..e9d2c00f76
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_dm_source_and_sink_savemode.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ url = "jdbc:dm://e2e_dmdb:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ query = """select * from "SYSDBA".e2e_table_source"""
+ }
+
+}
+
+sink {
+ Jdbc {
+ url = "jdbc:dm://e2e_dmdb:5236"
+ driver = "dm.jdbc.driver.DmDriver"
+ connection_check_timeout_sec = 1000
+ user = "SYSDBA"
+ password = "SYSDBA"
+ database = "DAMENG"
+ table = "SYSDBA.e2e_table_sink1"
+ generate_sink_sql = true
+ }
+}
+