fqshopify commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2937422436


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java:
##########


Review Comment:
   Thanks for sharing this! And apologies for the delayed response, I've been 
massively backlogged at work. 
   
   I thought about this and decided to remove projection pushdown support for 
the upsert-kafka-connector for now in order to keep this PR slim/focused. It 
requires a few changes (see below) that would be better to address in a 
separate PR with proper testing. 
   
   > A possible fix: make DecodingFormatWrapper also implement 
ProjectableDecodingFormat and delegate createRuntimeDecoder(context, 
producedDataType, projections) to the inner format when it's a 
ProjectableDecodingFormat.
   
   This will create other issues. Rather than making `DecodingFormatWrapper` 
implement `ProjectableDecodingFormat`, I'd rather eliminate the wrapper 
entirely if possible. It exists only to override `getChangelogMode()`, but 
`KafkaDynamicSource` already has a `upsertMode` boolean so we can override the 
changelog mode directly in `KafkaDynamicSource.getChangelogMode()` when 
`upsertMode` is true, and pass the unwrapped format through. That way 
`Decoder.create()` sees the real format's capabilities (e.g. JSON's 
`ProjectableDecodingFormat`) without any delegation tricks. We can do all of 
that in a separate PR once this one goes in. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to