fqshopify commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3197399261
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -139,7 +142,7 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
throws IOException {
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
- if (keyDeserialization == null && !hasMetadata) {
+ if (keyDeserialization == null && !hasMetadata && !hasValueProjection)
{
Review Comment:
> value projection might be needed even if
valueFormatProjectionPushdownLevel is set to NONE (i.e columns reordering)
Some clarifications:
- "projection" in Flink covers both narrowing *and* reordering of fields.
- This is unrelated to `valueFormatProjectionPushdownLevel` (as covered in
my earlier reply): that option only controls whether projection is pushed *all
the way down into the format*. The narrowing/reordering case above can occur
regardless of the level (even with `NONE`).
> So by default we are presenting another limitation on the shortcut path
The `&& !hasValueProjection` guard was effectively always there, just
implicit. Before this PR, the value deserializer was always built from a
`physicalDataType` that matched the produced row's value portion exactly. So
when there was no key and no metadata, the deserialized row *was* the produced
row, and emitting it directly was always safe.
With projection pushdown that's no longer guaranteed. Flink can now call
`applyProjection` and ask for a subset/reordering of the value fields. So the
deserialized row can legitimately differ from the produced row even when no key
and no metadata are involved — hence the new explicit guard.
> Is there any way to avoid it given deserialize is the record level hotpath?
The shortcut will still be taken in the cases it was before. The shortcut
can now _also_ potentially be taken in cases where projections are pushed down
into the format and it returns exactly what downstream operators expect.
There's a bunch of unit tests around this in `DecoderTest`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]