Hisoka-X commented on code in PR #7840:
URL: https://github.com/apache/seatunnel/pull/7840#discussion_r1802245393
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java:
##########
@@ -119,6 +119,26 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection
jdbcConnection, TableId
.collect(Collectors.toList());
}
+ default Boolean isUniqueKey(JdbcConnection jdbcConnection, TableId
tableId, String columnName)
Review Comment:
```suggestion
default boolean isUniqueKey(JdbcConnection jdbcConnection, TableId
tableId, String columnName)
```
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java:
##########
@@ -379,12 +380,38 @@ protected SnapshotSplit createSnapshotSplit(
protected Column getSplitColumn(
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
throws SQLException {
- Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
Column splitColumn = null;
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+
+ // first , compare user defined split column is in the primary key or
unique key
+ Properties splitColumnProperties = new Properties();
Review Comment:
Why not just use Map?
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java:
##########
@@ -379,12 +380,38 @@ protected SnapshotSplit createSnapshotSplit(
protected Column getSplitColumn(
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId
tableId)
throws SQLException {
- Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
Column splitColumn = null;
+ Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+
+ // first , compare user defined split column is in the primary key or
unique key
+ Properties splitColumnProperties = new Properties();
+ try {
+ splitColumnProperties = sourceConfig.getSplitColumn();
+ } catch (Exception e) {
+ log.error("Config snapshot.split.column get exception in {}:{}",
tableId, e);
+ }
+ String tableSc =
+ (String) splitColumnProperties.get(tableId.catalog() + "." +
tableId.table());
+ Boolean isUniqueKey = dialect.isUniqueKey(jdbc, tableId, tableSc);
+ if (isUniqueKey) {
+ Column column = table.columnWithName(tableSc);
+ if (isEvenlySplitColumn(column)) {
+ return column;
+ } else {
+ log.warn(
+ "Config snapshot.split.column type in {} is not
TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
+ tableId);
+ }
+ } else {
+ log.warn(
Review Comment:
```suggestion
log.info(
```
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java:
##########
@@ -119,6 +119,26 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection
jdbcConnection, TableId
.collect(Collectors.toList());
}
+ default Boolean isUniqueKey(JdbcConnection jdbcConnection, TableId
tableId, String columnName)
+ throws SQLException {
+ boolean isUnique = false;
+ if (null != columnName) {
Review Comment:
```suggestion
if (StringUtils.isNotEmpty(columnName)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]