fresh-borzoni commented on code in PR #27372:
URL: https://github.com/apache/flink/pull/27372#discussion_r3378820006
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -44,6 +44,7 @@ import scala.collection.JavaConversions._
* for the standard logical algebra.
*/
class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
+ private val MaxGeneratedEnrichedKeys = 128
Review Comment:
One small deviation from the literal "new metadata read at the sink": an
intermediate Calc/Project between joins usually drops the redundant equated
column before we reach the sink, so by the sink the equivalence is already gone
- a single metadata value read there would miss it. So instead of a standalone
equivalence metadata I added a small resolver
UpsertKeyEquivalenceUtil.upsertKeysWithin(rel, target, mq), called from the two
consumers (canUpsertKeysWithImmutableColsSatisfyPk and
StreamPhysicalSink.primaryKeysContainsUpsertKey). It threads the sink PK down
the plan as a fixed target and, on top of the keys getUpsertKeys already
reports, recovers the ones that fit after swapping an out-of-target column for
an equi-join-equivalent partner - resolving at the deeper join where the column
still exists and mapping the key back up through the projection.
Since the target is the fixed (small) PK, nothing combinatorial is ever
materialized, so there's no explosion. Substitution only goes toward a
non-nullable column, so it's safe across outer joins, and it also covers the
fused MultiJoin.
Let me know what you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]