haruki-830 commented on code in PR #4437:
URL: https://github.com/apache/flink-cdc/pull/4437#discussion_r3518580674


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java:
##########
@@ -203,8 +205,22 @@ public DataSource createDataSource(Context context) {
         String metadataList = config.get(METADATA_LIST);
         List<PostgreSQLReadableMetadata> readableMetadataList = 
listReadableMetadata(metadataList);
 
-        // Create a custom PostgresDataSource that passes the 
includeDatabaseInTableId flag
-        return new PostgresDataSource(configFactory, readableMetadataList);
+        String changelogModeRaw = config.get(CHANGELOG_MODE);
+        DebeziumChangelogMode changelogMode;
+        switch (changelogModeRaw.toLowerCase()) {

Review Comment:
   If enumType is adopted for CHANGELOG_MODE, this switch block could be 
simplified to a single config.get(CHANGELOG_MODE) call.
   Also, it might be good to add a primary key validation for upsert mode, 
similar to what the SQL connector does in `PostgreSQLTableFactory`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to