Mrart commented on code in PR #4115:
URL: https://github.com/apache/flink-cdc/pull/4115#discussion_r2617737675


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java:
##########
@@ -313,8 +318,12 @@ public PostgresSourceBuilder<T> 
includePartitionedTables(boolean includePartitio
     public PostgresIncrementalSource<T> build() {
         PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
         PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
-        return new PostgresIncrementalSource<>(
-                configFactory, checkNotNull(deserializer), offsetFactory, 
dialect);
+
+        PostgresIncrementalSource<T> source =
+                new PostgresIncrementalSource<>(
+                        configFactory, checkNotNull(deserializer), 
offsetFactory, dialect);
+
+        return source;

Review Comment:
   Why do we need to change this, and return is not good?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java:
##########
@@ -47,7 +49,7 @@
 /** The context for fetch task that fetching data of snapshot split from JDBC 
data source. */
 @Internal
 public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class);

Review Comment:
   Is there a place to call this LOG?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/utils/JdbcChunkUtils.java:
##########
@@ -100,15 +108,55 @@ public static Object queryMin(
                 });
     }
 
+    // Write createTableFilter method here to avoid the dependency on 
DebeziumUtils
+    private static Tables.TableFilter createTableFilter(String schemaName, 
String tableName) {
+        return new Tables.TableFilter() {
+            @Override
+            public boolean isIncluded(TableId tableId) {
+                final String catalog = tableId.catalog();
+                final String schema = tableId.schema();
+                final String table = tableId.table();
+
+                if (schemaName != null && 
!schemaName.equalsIgnoreCase(schema)) {
+                    return false;
+                }
+
+                if (tableName != null && !tableName.equalsIgnoreCase(table)) {
+                    return false;
+                }

Review Comment:
   We can  extract the test functions and extract the test functions?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java:
##########
@@ -52,6 +55,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
     private int lsnCommitCheckpointsDelay;
 
+    private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();

Review Comment:
   Is ConcurrentHashMap more appropriate?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java:
##########
@@ -131,15 +134,86 @@ public static TableId getTableId(SourceRecord dataRecord) 
{
 
     public static Object[] getSplitKey(
             RowType splitBoundaryType, SourceRecord dataRecord, 
SchemaNameAdjuster nameAdjuster) {
-        // the split key field contains single field now
         String splitFieldName = 
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
-        Struct key = (Struct) dataRecord.key();
-        return new Object[] {key.get(splitFieldName)};
+
+        // Try primary key struct first (for backward compatibility)
+        Struct keyStruct = (Struct) dataRecord.key();
+        if (keyStruct != null && keyStruct.schema().field(splitFieldName) != 
null) {
+            return new Object[] {keyStruct.get(splitFieldName)};
+        }
+        LOG.info("Get Split Key From Value {} {}", dataRecord, splitFieldName);

Review Comment:
   Does this place need debug logs?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java:
##########
@@ -131,15 +134,86 @@ public static TableId getTableId(SourceRecord dataRecord) 
{
 
     public static Object[] getSplitKey(
             RowType splitBoundaryType, SourceRecord dataRecord, 
SchemaNameAdjuster nameAdjuster) {
-        // the split key field contains single field now
         String splitFieldName = 
nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
-        Struct key = (Struct) dataRecord.key();
-        return new Object[] {key.get(splitFieldName)};
+
+        // Try primary key struct first (for backward compatibility)
+        Struct keyStruct = (Struct) dataRecord.key();
+        if (keyStruct != null && keyStruct.schema().field(splitFieldName) != 
null) {
+            return new Object[] {keyStruct.get(splitFieldName)};
+        }
+        LOG.info("Get Split Key From Value {} {}", dataRecord, splitFieldName);
+        // For non-primary key chunk keys, use value-based approach
+        return getSplitKeyFromValue(dataRecord, splitFieldName);
+    }
+
+    /** Extract chunk key from value struct (AFTER/BEFORE) for non-primary key 
chunk keys. */
+    private static Object[] getSplitKeyFromValue(SourceRecord dataRecord, 
String splitFieldName) {
+        Struct value = (Struct) dataRecord.value();
+        if (value == null) {
+            return null; // No value struct available
+        }

Review Comment:
   dataRecord may not be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to