Savonitar commented on code in PR #174: URL: https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2818443510
########## flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java: ########## Review Comment: 1. Shouldn't we add `KEY_PROJECTION_PUSHDOWN_LEVEL` and `VALUE_PROJECTION_PUSHDOWN_LEVEL` to `optionalOptions()` method? The factory reads these options at line 190-191 (in `createDynamicTableSource`) and passes them to `KafkaDynamicSource`, but they aren't registered as optional options. As a result, when a user sets `key.format-projection-pushdown-level = TOP_LEVEL` on an `upsert-kafka` table, FactoryHelper will throw ``` ValidationException: Unsupported options found for 'upsert-kafka' ``` because the key isn't in the consumed set. Or am I missing something? 2. Could we add a test that creates an upsert-kafka table with a non-default pushdown level to catch this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
