This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 32b7f2b690 [Feature][CDC] Support tables without primary keys (with 
unique keys) (#163) (#5150)
32b7f2b690 is described below

commit 32b7f2b6906cbbd7c177ca00fabde974d384f06f
Author: hailin0 <[email protected]>
AuthorDate: Wed Jul 26 10:25:57 2023 +0800

    [Feature][CDC] Support tables without primary keys (with unique keys) 
(#163) (#5150)
---
 .../cdc/base/dialect/JdbcDataSourceDialect.java    | 100 +++++++++++++++++++++
 .../splitter/JdbcSourceChunkSplitter.java          |  44 +++++++++
 .../mysql/source/eumerator/MySqlChunkSplitter.java |  18 +---
 .../sqlserver/source/source/SqlServerDialect.java  |   5 +-
 .../source/eumerator/SqlServerChunkSplitter.java   |  18 +---
 .../sqlserver/source/utils/SqlServerSchema.java    |  18 ++--
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  10 ++-
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |  17 ++++
 8 files changed, 181 insertions(+), 49 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 2c93bf387a..17947ad1a6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.cdc.base.dialect;
 
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionFactory;
@@ -25,11 +27,23 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfig> {
 
@@ -68,4 +82,90 @@ public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfi
     @Override
     JdbcSourceFetchTaskContext createFetchTaskContext(
             SourceSplitBase sourceSplitBase, JdbcSourceConfig 
taskSourceConfig);
+
+    default Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, 
TableId tableId)
+            throws SQLException {
+
+        DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
+
+        // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), 
tableId.table());
+
+        // seq -> column name
+        List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
+        String pkName = null;
+        while (rs.next()) {
+            // all the PK_NAME should be the same
+            pkName = rs.getString("PK_NAME");
+            String columnName = rs.getString("COLUMN_NAME");
+            int keySeq = rs.getInt("KEY_SEQ");
+            // KEY_SEQ is 1-based index
+            primaryKeyColumns.add(Pair.of(keySeq, columnName));
+        }
+        // initialize size
+        List<String> pkFields =
+                primaryKeyColumns.stream()
+                        .sorted(Comparator.comparingInt(Pair::getKey))
+                        .map(Pair::getValue)
+                        .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(pkFields)) {
+            return Optional.empty();
+        }
+        return Optional.of(PrimaryKey.of(pkName, pkFields));
+    }
+
+    default List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, 
TableId tableId)
+            throws SQLException {
+        return getConstraintKeys(jdbcConnection, tableId).stream()
+                .filter(
+                        constraintKey ->
+                                constraintKey.getConstraintType()
+                                        == 
ConstraintKey.ConstraintType.UNIQUE_KEY)
+                .collect(Collectors.toList());
+    }
+
+    default List<ConstraintKey> getConstraintKeys(JdbcConnection 
jdbcConnection, TableId tableId)
+            throws SQLException {
+        DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
+
+        ResultSet resultSet =
+                metaData.getIndexInfo(
+                        tableId.catalog(), tableId.schema(), tableId.table(), 
false, false);
+        // index name -> index
+        Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+        while (resultSet.next()) {
+            String columnName = resultSet.getString("COLUMN_NAME");
+            if (columnName == null) {
+                continue;
+            }
+
+            String indexName = resultSet.getString("INDEX_NAME");
+            boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+
+            ConstraintKey constraintKey =
+                    constraintKeyMap.computeIfAbsent(
+                            indexName,
+                            s -> {
+                                ConstraintKey.ConstraintType constraintType =
+                                        ConstraintKey.ConstraintType.KEY;
+                                if (!noUnique) {
+                                    constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
+                                }
+                                return ConstraintKey.of(
+                                        constraintType, indexName, new 
ArrayList<>());
+                            });
+
+            ConstraintKey.ColumnSortType sortType =
+                    "A".equals(resultSet.getString("ASC_OR_DESC"))
+                            ? ConstraintKey.ColumnSortType.ASC
+                            : ConstraintKey.ColumnSortType.DESC;
+            ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+                    new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
+            constraintKey.getColumnNames().add(constraintKeyColumn);
+        }
+        return new ArrayList<>(constraintKeyMap.values());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index 9e42d55263..bbad9d04b1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -17,16 +17,22 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
 
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
 
 /** The {@code ChunkSplitter} used to split table into a set of chunks for 
JDBC data source. */
 public interface JdbcSourceChunkSplitter extends ChunkSplitter {
@@ -161,4 +167,42 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
                 new String[] {splitColumn.name()},
                 new SeaTunnelDataType[] {fromDbzColumn(splitColumn)});
     }
+
+    default Column getSplitColumn(
+            JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
+            throws SQLException {
+        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
+        if (primaryKey.isPresent()) {
+            List<String> pkColumns = primaryKey.get().getColumnNames();
+
+            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+            for (String pkColumn : pkColumns) {
+                Column column = table.columnWithName(pkColumn);
+                if (isEvenlySplitColumn(column)) {
+                    return column;
+                }
+            }
+        }
+
+        List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
+        if (!uniqueKeys.isEmpty()) {
+            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+            for (ConstraintKey uniqueKey : uniqueKeys) {
+                List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
+                        uniqueKey.getColumnNames();
+                for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : 
uniqueKeyColumns) {
+                    Column column = 
table.columnWithName(uniqueKeyColumn.getColumnName());
+                    if (isEvenlySplitColumn(column)) {
+                        return column;
+                    }
+                }
+            }
+        }
+
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Incremental snapshot for tables requires primary 
key/unique key,"
+                                + " but table %s doesn't have primary key.",
+                        tableId));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index 05935d1701..04671d28f5 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 
 import java.math.BigDecimal;
@@ -67,8 +66,7 @@ public class MySqlChunkSplitter implements 
JdbcSourceChunkSplitter {
             LOG.info("Start splitting table {} into chunks...", tableId);
             long start = System.currentTimeMillis();
 
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            Column splitColumn = getSplitColumn(table);
+            Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
             final List<ChunkRange> chunks;
             try {
                 chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
@@ -393,18 +391,4 @@ public class MySqlChunkSplitter implements 
JdbcSourceChunkSplitter {
             LOG.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
         }
     }
-
-    public static Column getSplitColumn(Table table) {
-        List<Column> primaryKeys = table.primaryKeyColumns();
-        if (primaryKeys.isEmpty()) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Incremental snapshot for tables requires primary 
key,"
-                                    + " but table %s doesn't have primary 
key.",
-                            table.id()));
-        }
-
-        // use first field in primary key as the split key
-        return primaryKeys.get(0);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index 0494cd98e1..464d8637f7 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
 
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
@@ -47,7 +46,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.uti
 public class SqlServerDialect implements JdbcDataSourceDialect {
 
     private static final long serialVersionUID = 1L;
-    private final SourceConfig sourceConfig;
+    private final SqlServerSourceConfig sourceConfig;
 
     private transient SqlServerSchema sqlServerSchema;
 
@@ -95,7 +94,7 @@ public class SqlServerDialect implements 
JdbcDataSourceDialect {
     @Override
     public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, 
TableId tableId) {
         if (sqlServerSchema == null) {
-            sqlServerSchema = new SqlServerSchema();
+            sqlServerSchema = new 
SqlServerSchema(sourceConfig.getDbzConnectorConfig());
         }
         return sqlServerSchema.getTableSchema(jdbc, tableId);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index 3de596fd7d..ac0b8165db 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -30,7 +30,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlS
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
@@ -64,8 +63,7 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
             log.info("Start splitting table {} into chunks...", tableId);
             long start = System.currentTimeMillis();
 
-            Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
-            Column splitColumn = getSplitColumn(table);
+            Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
             final List<ChunkRange> chunks;
             try {
                 chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
@@ -390,18 +388,4 @@ public class SqlServerChunkSplitter implements 
JdbcSourceChunkSplitter {
             log.info("JdbcSourceChunkSplitter has split {} chunks for table 
{}", count, tableId);
         }
     }
-
-    public static Column getSplitColumn(Table table) {
-        List<Column> primaryKeys = table.primaryKeyColumns();
-        if (primaryKeys.isEmpty()) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "Incremental snapshot for tables requires primary 
key,"
-                                    + " but table %s doesn't have primary 
key.",
-                            table.id()));
-        }
-
-        // use first field in primary key as the split key
-        return primaryKeys.get(0);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
index 0e031a3cfd..83d51ae31b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 
 import io.debezium.connector.sqlserver.SqlServerConnection;
+import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
@@ -29,18 +30,18 @@ import 
io.debezium.relational.history.TableChanges.TableChange;
 
 import java.sql.SQLException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /** A component used to get schema by table path. */
 public class SqlServerSchema {
 
+    private final SqlServerConnectorConfig connectorConfig;
     private final Map<TableId, TableChange> schemasByTableId;
 
-    public SqlServerSchema() {
+    public SqlServerSchema(SqlServerConnectorConfig connectorConfig) {
         this.schemasByTableId = new ConcurrentHashMap<>();
+        this.connectorConfig = connectorConfig;
     }
 
     public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
@@ -55,16 +56,17 @@ public class SqlServerSchema {
 
     private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
         SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
-        Set<TableId> tableIdSet = new HashSet<>();
-        tableIdSet.add(tableId);
 
         final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
         Tables tables = new Tables();
-        tables.overwriteTable(tables.editOrCreateTable(tableId).create());
-
         try {
             sqlServerConnection.readSchema(
-                    tables, tableId.catalog(), tableId.schema(), null, null, 
false);
+                    tables,
+                    tableId.catalog(),
+                    tableId.schema(),
+                    connectorConfig.getTableFilters().dataCollectionFilter(),
+                    null,
+                    false);
             Table table = tables.forTable(tableId);
             TableChange tableChange = new 
TableChange(TableChanges.TableChangeType.CREATE, table);
             tableChangeMap.put(tableId, tableChange);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 28da814325..247ecc651f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -180,9 +180,12 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         // index name -> index
         Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
         while (resultSet.next()) {
-            String indexName = resultSet.getString("INDEX_NAME");
             String columnName = resultSet.getString("COLUMN_NAME");
-            String unique = resultSet.getString("NON_UNIQUE");
+            if (columnName == null) {
+                continue;
+            }
+            String indexName = resultSet.getString("INDEX_NAME");
+            boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
 
             ConstraintKey constraintKey =
                     constraintKeyMap.computeIfAbsent(
@@ -190,8 +193,7 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
                             s -> {
                                 ConstraintKey.ConstraintType constraintType =
                                         ConstraintKey.ConstraintType.KEY;
-                                // 0 is unique.
-                                if ("0".equals(unique)) {
+                                if (!noUnique) {
                                     constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
                                 }
                                 return ConstraintKey.of(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index a9bb1c1555..ef3f985432 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -37,6 +38,7 @@ import com.google.auto.service.AutoService;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS;
@@ -78,6 +80,21 @@ public class JdbcSinkFactory implements TableSinkFactory {
             PrimaryKey primaryKey = 
catalogTable.getTableSchema().getPrimaryKey();
             if (primaryKey != null && 
!CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
                 map.put(PRIMARY_KEYS.key(), String.join(",", 
primaryKey.getColumnNames()));
+            } else {
+                Optional<ConstraintKey> keyOptional =
+                        
catalogTable.getTableSchema().getConstraintKeys().stream()
+                                .filter(
+                                        key ->
+                                                
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
+                                                        
key.getConstraintType()))
+                                .findFirst();
+                if (keyOptional.isPresent()) {
+                    map.put(
+                            PRIMARY_KEYS.key(),
+                            keyOptional.get().getColumnNames().stream()
+                                    .map(key -> key.getColumnName())
+                                    .collect(Collectors.joining(",")));
+                }
             }
             config = ReadonlyConfig.fromMap(new HashMap<>(map));
         }

Reply via email to