nickdelnano commented on code in PR #4078: URL: https://github.com/apache/flink-cdc/pull/4078#discussion_r2257740909
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSinkFactory.java: ########## @@ -49,15 +50,21 @@ public class PaimonDataSinkFactory implements DataSinkFactory { @Override public DataSink createDataSink(Context context) { FactoryHelper.createFactoryHelper(this, context) - .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); + .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_SPECIFIC_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES); Map<String, String> allOptions = context.getFactoryConfiguration().toMap(); Map<String, String> catalogOptions = new HashMap<>(); Map<String, String> tableOptions = new HashMap<>(); + Map<String, String> tableSpecificOptions = new HashMap<>(); allOptions.forEach( (key, value) -> { if (key.startsWith(PREFIX_TABLE_PROPERTIES)) { - tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value); + String keyWithoutPrefix = key.substring(PREFIX_TABLE_PROPERTIES.length()); + // General table option: table.properties.<property> + tableOptions.put(keyWithoutPrefix, value); + } else if (key.startsWith(PREFIX_SPECIFIC_TABLE_PROPERTIES)) { Review Comment: That is the behavior. Test case `testTableSpecificOptionsPrecedenceOverTableOptions` demonstrates it. The function here parses the configuration. Then [here](https://github.com/apache/flink-cdc/pull/4078/files#diff-5b829cedf9c430c9efc3a14629963c7fa7f3070eb6fbc470f4b0612a8ed63af3) the table specific overrides are applied when a table is created and its properties are set. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org