fresh-borzoni commented on code in PR #28090:
URL: https://github.com/apache/flink/pull/28090#discussion_r3373780259


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -1599,6 +1631,90 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
   }
 
+  private def referencesNonUpsertKeyColumns(node: RelNode, rexNodes: 
Seq[RexNode]): Boolean = {
+    if (rexNodes.isEmpty) {
+      return false
+    }
+
+    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(node.getCluster.getMetadataQuery)
+    val upsertKeys = fmq.getUpsertKeys(node)
+
+    if (upsertKeys == null || upsertKeys.isEmpty) {
+      return true
+    }
+
+    val fieldRefIndices = ImmutableBitSet.of(
+      
RexNodeExtractor.extractRefInputFields(JavaScalaConversionUtil.toJava(rexNodes)):
 _*)
+
+    !upsertKeys.exists(upsertKey => upsertKey.contains(fieldRefIndices))
+  }
+
+  private def hasNonUpsertKeyFilterPushedDown(ts: 
StreamPhysicalTableSourceScan): Boolean = {
+    val tableSourceTable = ts.getTable.unwrap(classOf[TableSourceTable])
+    if (tableSourceTable == null) {
+      return false
+    }
+
+    val filterSpec = tableSourceTable.abilitySpecs
+      .collectFirst { case spec: FilterPushDownSpec => spec }
+
+    filterSpec match {
+      case Some(spec) =>
+        val predicates = JavaScalaConversionUtil.toScala(spec.getPredicates)
+        referencesNonUpsertKeyColumns(ts, predicates)
+      case None => false

Review Comment:
   Good catch. I now remap the filter's refs through ProjectPushDownSpec before 
the upsert-key check so they're in the same index space, and conservatively 
keep UB if a filtered column got projected away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to