Hisoka-X commented on code in PR #7840:
URL: https://github.com/apache/seatunnel/pull/7840#discussion_r1802245393


##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java:
##########
@@ -119,6 +119,26 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection 
jdbcConnection, TableId
                 .collect(Collectors.toList());
     }
 
+    default Boolean isUniqueKey(JdbcConnection jdbcConnection, TableId 
tableId, String columnName)

Review Comment:
   ```suggestion
       default boolean isUniqueKey(JdbcConnection jdbcConnection, TableId 
tableId, String columnName)
   ```



##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java:
##########
@@ -379,12 +380,38 @@ protected SnapshotSplit createSnapshotSplit(
     protected Column getSplitColumn(
             JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
             throws SQLException {
-        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
         Column splitColumn = null;
+        Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+
+        // first , compare user defined split column is in the primary key or 
unique key
+        Properties splitColumnProperties = new Properties();

Review Comment:
   Why not just use Map?



##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java:
##########
@@ -379,12 +380,38 @@ protected SnapshotSplit createSnapshotSplit(
     protected Column getSplitColumn(
             JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId 
tableId)
             throws SQLException {
-        Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
         Column splitColumn = null;
+        Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
+
+        // first , compare user defined split column is in the primary key or 
unique key
+        Properties splitColumnProperties = new Properties();
+        try {
+            splitColumnProperties = sourceConfig.getSplitColumn();
+        } catch (Exception e) {
+            log.error("Config snapshot.split.column get exception in {}:{}", 
tableId, e);
+        }
+        String tableSc =
+                (String) splitColumnProperties.get(tableId.catalog() + "." + 
tableId.table());
+        Boolean isUniqueKey = dialect.isUniqueKey(jdbc, tableId, tableSc);
+        if (isUniqueKey) {
+            Column column = table.columnWithName(tableSc);
+            if (isEvenlySplitColumn(column)) {
+                return column;
+            } else {
+                log.warn(
+                        "Config snapshot.split.column type in {} is not 
TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
+                        tableId);
+            }
+        } else {
+            log.warn(

Review Comment:
   ```suggestion
               log.info(
   ```



##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java:
##########
@@ -119,6 +119,26 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection 
jdbcConnection, TableId
                 .collect(Collectors.toList());
     }
 
+    default Boolean isUniqueKey(JdbcConnection jdbcConnection, TableId 
tableId, String columnName)
+            throws SQLException {
+        boolean isUnique = false;
+        if (null != columnName) {

Review Comment:
   ```suggestion
           if (StringUtils.isNotEmpty(columnName)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to