This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 32b7f2b690 [Feature][CDC] Support tables without primary keys (with
unique keys) (#163) (#5150)
32b7f2b690 is described below
commit 32b7f2b6906cbbd7c177ca00fabde974d384f06f
Author: hailin0 <[email protected]>
AuthorDate: Wed Jul 26 10:25:57 2023 +0800
[Feature][CDC] Support tables without primary keys (with unique keys)
(#163) (#5150)
---
.../cdc/base/dialect/JdbcDataSourceDialect.java | 100 +++++++++++++++++++++
.../splitter/JdbcSourceChunkSplitter.java | 44 +++++++++
.../mysql/source/eumerator/MySqlChunkSplitter.java | 18 +---
.../sqlserver/source/source/SqlServerDialect.java | 5 +-
.../source/eumerator/SqlServerChunkSplitter.java | 18 +---
.../sqlserver/source/utils/SqlServerSchema.java | 18 ++--
.../jdbc/catalog/AbstractJdbcCatalog.java | 10 ++-
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 17 ++++
8 files changed, 181 insertions(+), 49 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 2c93bf387a..17947ad1a6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.cdc.base.dialect;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionFactory;
@@ -25,11 +27,23 @@ import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfig> {
@@ -68,4 +82,90 @@ public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfi
@Override
JdbcSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig);
+
+ default Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection,
TableId tableId)
+ throws SQLException {
+
+ DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
+
+ // According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
+ // the returned primary key columns are ordered by COLUMN_NAME, not by
KEY_SEQ.
+ // We need to sort them based on the KEY_SEQ value.
+ ResultSet rs =
+ metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(),
tableId.table());
+
+ // seq -> column name
+ List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
+ String pkName = null;
+ while (rs.next()) {
+ // all the PK_NAME should be the same
+ pkName = rs.getString("PK_NAME");
+ String columnName = rs.getString("COLUMN_NAME");
+ int keySeq = rs.getInt("KEY_SEQ");
+ // KEY_SEQ is 1-based index
+ primaryKeyColumns.add(Pair.of(keySeq, columnName));
+ }
+ // initialize size
+ List<String> pkFields =
+ primaryKeyColumns.stream()
+ .sorted(Comparator.comparingInt(Pair::getKey))
+ .map(Pair::getValue)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(pkFields)) {
+ return Optional.empty();
+ }
+ return Optional.of(PrimaryKey.of(pkName, pkFields));
+ }
+
+ default List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection,
TableId tableId)
+ throws SQLException {
+ return getConstraintKeys(jdbcConnection, tableId).stream()
+ .filter(
+ constraintKey ->
+ constraintKey.getConstraintType()
+ ==
ConstraintKey.ConstraintType.UNIQUE_KEY)
+ .collect(Collectors.toList());
+ }
+
+ default List<ConstraintKey> getConstraintKeys(JdbcConnection
jdbcConnection, TableId tableId)
+ throws SQLException {
+ DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
+
+ ResultSet resultSet =
+ metaData.getIndexInfo(
+ tableId.catalog(), tableId.schema(), tableId.table(),
false, false);
+ // index name -> index
+ Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+ while (resultSet.next()) {
+ String columnName = resultSet.getString("COLUMN_NAME");
+ if (columnName == null) {
+ continue;
+ }
+
+ String indexName = resultSet.getString("INDEX_NAME");
+ boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+
+ ConstraintKey constraintKey =
+ constraintKeyMap.computeIfAbsent(
+ indexName,
+ s -> {
+ ConstraintKey.ConstraintType constraintType =
+ ConstraintKey.ConstraintType.KEY;
+ if (!noUnique) {
+ constraintType =
ConstraintKey.ConstraintType.UNIQUE_KEY;
+ }
+ return ConstraintKey.of(
+ constraintType, indexName, new
ArrayList<>());
+ });
+
+ ConstraintKey.ColumnSortType sortType =
+ "A".equals(resultSet.getString("ASC_OR_DESC"))
+ ? ConstraintKey.ColumnSortType.ASC
+ : ConstraintKey.ColumnSortType.DESC;
+ ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+ new ConstraintKey.ConstraintKeyColumn(columnName,
sortType);
+ constraintKey.getColumnNames().add(constraintKeyColumn);
+ }
+ return new ArrayList<>(constraintKeyMap.values());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index 9e42d55263..bbad9d04b1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -17,16 +17,22 @@
package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
/** The {@code ChunkSplitter} used to split table into a set of chunks for
JDBC data source. */
public interface JdbcSourceChunkSplitter extends ChunkSplitter {
@@ -161,4 +167,42 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
new String[] {splitColumn.name()},
new SeaTunnelDataType[] {fromDbzColumn(splitColumn)});
}
+
+ default Column getSplitColumn(
+ JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
+ throws SQLException {
+ Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
+ if (primaryKey.isPresent()) {
+ List<String> pkColumns = primaryKey.get().getColumnNames();
+
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+ for (String pkColumn : pkColumns) {
+ Column column = table.columnWithName(pkColumn);
+ if (isEvenlySplitColumn(column)) {
+ return column;
+ }
+ }
+ }
+
+ List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
+ if (!uniqueKeys.isEmpty()) {
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+ for (ConstraintKey uniqueKey : uniqueKeys) {
+ List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
+ uniqueKey.getColumnNames();
+ for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn :
uniqueKeyColumns) {
+ Column column =
table.columnWithName(uniqueKeyColumn.getColumnName());
+ if (isEvenlySplitColumn(column)) {
+ return column;
+ }
+ }
+ }
+ }
+
+ throw new UnsupportedOperationException(
+ String.format(
+ "Incremental snapshot for tables requires primary
key/unique key,"
+ + " but table %s doesn't have primary key.",
+ tableId));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index 05935d1701..04671d28f5 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.math.BigDecimal;
@@ -67,8 +66,7 @@ public class MySqlChunkSplitter implements
JdbcSourceChunkSplitter {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn = getSplitColumn(table);
+ Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
@@ -393,18 +391,4 @@ public class MySqlChunkSplitter implements
JdbcSourceChunkSplitter {
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
}
}
-
- public static Column getSplitColumn(Table table) {
- List<Column> primaryKeys = table.primaryKeyColumns();
- if (primaryKeys.isEmpty()) {
- throw new UnsupportedOperationException(
- String.format(
- "Incremental snapshot for tables requires primary
key,"
- + " but table %s doesn't have primary
key.",
- table.id()));
- }
-
- // use first field in primary key as the split key
- return primaryKeys.get(0);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index 0494cd98e1..464d8637f7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -19,7 +19,6 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
-import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
@@ -47,7 +46,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.uti
public class SqlServerDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
- private final SourceConfig sourceConfig;
+ private final SqlServerSourceConfig sourceConfig;
private transient SqlServerSchema sqlServerSchema;
@@ -95,7 +94,7 @@ public class SqlServerDialect implements
JdbcDataSourceDialect {
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc,
TableId tableId) {
if (sqlServerSchema == null) {
- sqlServerSchema = new SqlServerSchema();
+ sqlServerSchema = new
SqlServerSchema(sourceConfig.getDbzConnectorConfig());
}
return sqlServerSchema.getTableSchema(jdbc, tableId);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index 3de596fd7d..ac0b8165db 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -30,7 +30,6 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlS
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
-import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
@@ -64,8 +63,7 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
log.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();
- Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
- Column splitColumn = getSplitColumn(table);
+ Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
@@ -390,18 +388,4 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
log.info("JdbcSourceChunkSplitter has split {} chunks for table
{}", count, tableId);
}
}
-
- public static Column getSplitColumn(Table table) {
- List<Column> primaryKeys = table.primaryKeyColumns();
- if (primaryKeys.isEmpty()) {
- throw new UnsupportedOperationException(
- String.format(
- "Incremental snapshot for tables requires primary
key,"
- + " but table %s doesn't have primary
key.",
- table.id()));
- }
-
- // use first field in primary key as the split key
- return primaryKeys.get(0);
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
index 0e031a3cfd..83d51ae31b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import io.debezium.connector.sqlserver.SqlServerConnection;
+import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@@ -29,18 +30,18 @@ import
io.debezium.relational.history.TableChanges.TableChange;
import java.sql.SQLException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/** A component used to get schema by table path. */
public class SqlServerSchema {
+ private final SqlServerConnectorConfig connectorConfig;
private final Map<TableId, TableChange> schemasByTableId;
- public SqlServerSchema() {
+ public SqlServerSchema(SqlServerConnectorConfig connectorConfig) {
this.schemasByTableId = new ConcurrentHashMap<>();
+ this.connectorConfig = connectorConfig;
}
public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
@@ -55,16 +56,17 @@ public class SqlServerSchema {
private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
- Set<TableId> tableIdSet = new HashSet<>();
- tableIdSet.add(tableId);
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
- tables.overwriteTable(tables.editOrCreateTable(tableId).create());
-
try {
sqlServerConnection.readSchema(
- tables, tableId.catalog(), tableId.schema(), null, null,
false);
+ tables,
+ tableId.catalog(),
+ tableId.schema(),
+ connectorConfig.getTableFilters().dataCollectionFilter(),
+ null,
+ false);
Table table = tables.forTable(tableId);
TableChange tableChange = new
TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 28da814325..247ecc651f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -180,9 +180,12 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
- String indexName = resultSet.getString("INDEX_NAME");
String columnName = resultSet.getString("COLUMN_NAME");
- String unique = resultSet.getString("NON_UNIQUE");
+ if (columnName == null) {
+ continue;
+ }
+ String indexName = resultSet.getString("INDEX_NAME");
+ boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
@@ -190,8 +193,7 @@ public abstract class AbstractJdbcCatalog implements
Catalog {
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.KEY;
- // 0 is unique.
- if ("0".equals(unique)) {
+ if (!noUnique) {
constraintType =
ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index a9bb1c1555..ef3f985432 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -37,6 +38,7 @@ import com.google.auto.service.AutoService;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS;
@@ -78,6 +80,21 @@ public class JdbcSinkFactory implements TableSinkFactory {
PrimaryKey primaryKey =
catalogTable.getTableSchema().getPrimaryKey();
if (primaryKey != null &&
!CollectionUtils.isEmpty(primaryKey.getColumnNames())) {
map.put(PRIMARY_KEYS.key(), String.join(",",
primaryKey.getColumnNames()));
+ } else {
+ Optional<ConstraintKey> keyOptional =
+
catalogTable.getTableSchema().getConstraintKeys().stream()
+ .filter(
+ key ->
+
ConstraintKey.ConstraintType.UNIQUE_KEY.equals(
+
key.getConstraintType()))
+ .findFirst();
+ if (keyOptional.isPresent()) {
+ map.put(
+ PRIMARY_KEYS.key(),
+ keyOptional.get().getColumnNames().stream()
+ .map(key -> key.getColumnName())
+ .collect(Collectors.joining(",")));
+ }
}
config = ReadonlyConfig.fromMap(new HashMap<>(map));
}