dawidwys commented on code in PR #26306:
URL: https://github.com/apache/flink/pull/26306#discussion_r2005514386


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -932,10 +961,325 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
   }
 
-  // 
-------------------------------------------------------------------------------------------
+  /**
+   * A visitor which will try to satisfy the required [[DeleteKindTrait]] from 
root.
+   *
+   * <p>After traversed by this visitor, every node should have a correct 
[[DeleteKindTrait]] or
+   * returns None if the planner doesn't support to satisfy the required 
[[DeleteKindTrait]].
+   */
+  private class SatisfyDeleteKindTraitVisitor(private val context: 
StreamOptimizeContext) {
+
+    /**
+     * Try to satisfy the required [[DeleteKindTrait]] from root.
+     *
+     * <p>Each node will first require a DeleteKindTrait to its children. The 
required
+     * DeleteKindTrait may come from the node's parent, or come from the node 
itself, depending on
+     * whether the node will destroy the trait provided by children or pass 
the trait from children.
+     *
+     * <p>If the node will pass the children's DeleteKindTrait without 
destroying it, then return a
+     * new node with new inputs and forwarded DeleteKindTrait.
+     *
+     * <p>If the node will destroy the children's UpdateKindTrait, then the 
node itself needs to be
+     * converted, or a new node should be generated to satisfy the required 
trait, such as marking
+     * itself not to generate UPDATE_BEFORE, or generating a new node to 
filter UPDATE_BEFORE.
+     *
+     * @param rel
+     *   the node who should satisfy the requiredTrait
+     * @param requiredTrait
+     *   the required DeleteKindTrait
+     * @return
+     *   A converted node which satisfies required traits by input nodes of 
current node. Or None if
+     *   required traits cannot be satisfied.
+     */
+    def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): 
Option[StreamPhysicalRel] =
+      rel match {
+        case sink: StreamPhysicalSink =>
+          val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+          visitSink(sink, sinkRequiredTraits)
+
+        case sink: StreamPhysicalLegacySink[_] =>
+          val childModifyKindSet = getModifyKindSet(sink.getInput)
+          val fullDelete = fullDeleteOrNone(childModifyKindSet)
+          visitSink(sink, Seq(fullDelete))
+
+        case _: StreamPhysicalGroupAggregate | _: 
StreamPhysicalGroupTableAggregate |
+            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
+            _: StreamPhysicalPythonGroupTableAggregate | _: 
StreamPhysicalGroupWindowAggregateBase |
+            _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: 
StreamPhysicalRank |
+            _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
+            _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
+            _: StreamPhysicalWatermarkAssigner | _: 
StreamPhysicalWindowTableFunction |
+            _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
+            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
+            _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
+            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
|
+            _: StreamPhysicalProcessTableFunction =>
+          // if not explicitly supported, all operators require full deletes 
if there are updates
+          // ProcessTableFunction currently only consumes full deletes or 
insert-only
+          val children = rel.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              this.visit(child, fullDeleteOrNone(childModifyKindSet))
+          }.toList
+          createNewNode(rel, Some(children.flatten), 
fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case join: StreamPhysicalJoin =>
+          val children = join.getInputs.zipWithIndex.map {
+            case (child, childOrdinal) =>
+              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+              val supportsDeleteByKey = 
join.inputUniqueKeyContainsJoinKey(childOrdinal)
+              val inputModifyKindSet = getModifyKindSet(physicalChild)
+              if (supportsDeleteByKey && requiredTrait == DELETE_ON_KEY) {
+                this
+                  .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
+                  .orElse(this.visit(physicalChild, 
fullDeleteOrNone(inputModifyKindSet)))
+              } else {
+                this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
+              }
+          }
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val childRels = children.flatten.toList
+            if (childRels.exists(r => getDeleteKind(r) == 
DeleteKind.DELETE_ON_KEY)) {
+              createNewNode(join, Some(childRels), 
deleteOnKeyOrNone(getModifyKindSet(rel)))
+            } else {
+              createNewNode(join, Some(childRels), 
fullDeleteOrNone(getModifyKindSet(rel)))
+            }
+          }
+
+        case calc: StreamPhysicalCalcBase =>
+          if (
+            requiredTrait == DeleteKindTrait.DELETE_ON_KEY &&
+            calc.getProgram.getCondition != null
+          ) {
+            // this can be further improved by checking if the filter 
condition is on the key
+            None
+          } else {
+            // otherwise, forward DeleteKind requirement
+            visitChildren(rel, requiredTrait) match {
+              case None => None
+              case Some(children) =>
+                val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+                createNewNode(rel, Some(children), childTrait)
+            }
+          }
+
+        case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
+            _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalDropUpdateBefore =>
+          // transparent forward requiredTrait to children
+          visitChildren(rel, requiredTrait) match {
+            case None => None
+            case Some(children) =>
+              val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+              createNewNode(rel, Some(children), childTrait)
+          }
+
+        case union: StreamPhysicalUnion =>
+          val children = union.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              val requiredChildTrait = if 
(!childModifyKindSet.contains(ModifyKind.DELETE)) {
+                DeleteKindTrait.NONE
+              } else {
+                requiredTrait
+              }
+              this.visit(child, requiredChildTrait)
+          }.toList
+
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val deleteKinds = children.flatten
+              .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
+            // union can just forward changes, can't actively satisfy to 
another changelog mode
+            val providedTrait = if (deleteKinds.forall(k => 
DeleteKindTrait.NONE == k)) {
+              // if all the children is NO_DELETE, union is NO_DELETE

Review Comment:
   I had it as NO_DELETE initially 🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to