fresh-borzoni commented on code in PR #27372:
URL: https://github.com/apache/flink/pull/27372#discussion_r3378820006


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -44,6 +44,7 @@ import scala.collection.JavaConversions._
  * for the standard logical algebra.
  */
 class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] {
+  private val MaxGeneratedEnrichedKeys = 128

Review Comment:
   
   One small deviation from the literal "new metadata read at the sink": an 
intermediate Calc/Project between joins usually drops the redundant equated 
column before we reach the sink, so by the sink the equivalence is already gone 
- a single metadata value read there would miss it. So instead of a standalone 
equivalence metadata I added a small resolver 
UpsertKeyEquivalenceUtil.upsertKeysWithin(rel, target, mq), called from the two 
consumers  (canUpsertKeysWithImmutableColsSatisfyPk and 
StreamPhysicalSink.primaryKeysContainsUpsertKey). It threads the sink PK down 
the plan as a fixed target and, on top of the keys getUpsertKeys already 
reports, recovers the ones that fit after swapping an out-of-target column for 
an equi-join-equivalent partner - resolving at the deeper join where the column 
still exists and mapping the key back up through the projection.
   
   Since the target is the fixed (small) PK, nothing combinatorial is ever 
materialized, so there's no explosion. Substitution only goes toward a 
non-nullable column, so it's safe across outer joins, and it also covers the 
fused MultiJoin(inner only atm). 
   
   Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to