twalthr commented on code in PR #26306:
URL: https://github.com/apache/flink/pull/26306#discussion_r2005126606


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java:
##########
@@ -71,7 +82,21 @@ public static ChangelogMode insertOnly() {
      * contain {@link RowKind#UPDATE_BEFORE} rows.
      */
     public static ChangelogMode upsert() {
-        return UPSERT;
+        return upsert(false);
+    }
+
+    /**
+     * Shortcut for an upsert changelog that describes idempotent updates on a 
key and thus does not
+     * contain {@link RowKind#UPDATE_BEFORE} rows.
+     *
+     * <p>Tells the system the DELETEs contain the full record and not just 
the key.

Review Comment:
   ```suggestion
        * @param withFullDeletes  Tells the system the DELETEs contain the full 
record and not just the key.
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java:
##########
@@ -43,6 +43,15 @@ public final class ChangelogMode {
                     .addContainedKind(RowKind.INSERT)
                     .addContainedKind(RowKind.UPDATE_AFTER)
                     .addContainedKind(RowKind.DELETE)
+                    .deletesOnKey(true)
+                    .build();
+
+    private static final ChangelogMode UPSERT_WITH_FULL_DELETES =
+            ChangelogMode.newBuilder()
+                    .addContainedKind(RowKind.INSERT)
+                    .addContainedKind(RowKind.UPDATE_AFTER)
+                    .addContainedKind(RowKind.DELETE)
+                    .deletesOnKey(false)

Review Comment:
   Didn't we say to call this `keyOnlyDeletes`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Checks if it is safe to remove ChangelogNormalize as part of {@link
+ * FlinkChangelogModeInferenceProgram}. It checks:
+ *
+ * <ul>
+ *   <li>if there is no filter pushed into the changelog normalize
+ *   <li>if we don't need to produce UPDATE_BEFORE
+ *   <li>we don't access any metadata columns
+ * </ul>
+ */
+public class ChangelogNormalizeRequirementResolver {
+
+    /** Checks if it is safe to remove ChangelogNormalize. */
+    public static boolean isRequired(StreamPhysicalChangelogNormalize 
normalize) {
+        if (normalize.filterCondition() != null) {
+            return true;
+        }
+        if (!Objects.equals(
+                
normalize.getTraitSet().getTrait(UpdateKindTraitDef$.MODULE$.INSTANCE()),
+                UpdateKindTrait$.MODULE$.ONLY_UPDATE_AFTER())) {
+            return true;
+        }
+
+        // check if metadata columns are accessed
+        final RelNode input = normalize.getInput();
+
+        return visit(input, bitSetForAllOutputColumns(input));
+    }
+
+    private static ImmutableBitSet bitSetForAllOutputColumns(RelNode input) {
+        return ImmutableBitSet.builder().set(0, 
input.getRowType().getFieldCount()).build();
+    }
+
+    private static boolean visit(final RelNode rel, final ImmutableBitSet 
requiredColumns) {
+        if (rel instanceof StreamPhysicalCalcBase) {
+            return visitCalc((StreamPhysicalCalcBase) rel, requiredColumns);
+        } else if (rel instanceof StreamPhysicalTableSourceScan) {
+            return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, 
requiredColumns);
+        } else if (rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalExchange) {
+            // require all input columns
+            final RelNode input = ((SingleRel) rel).getInput();
+            return visit(input, bitSetForAllOutputColumns(input));
+        } else {
+            // these nodes should not be in an input of a changelog normalize
+            // StreamPhysicalChangelogNormalize
+            // StreamPhysicalDropUpdateBefore
+            // StreamPhysicalUnion
+            // StreamPhysicalSort
+            // StreamPhysicalLimit
+            // StreamPhysicalSortLimit
+            // StreamPhysicalTemporalSort
+            // StreamPhysicalWindowTableFunction
+            // StreamPhysicalWindowRank
+            // StreamPhysicalWindowDeduplicate
+            // StreamPhysicalRank
+            // StreamPhysicalOverAggregateBase
+            // CommonPhysicalJoin
+            // StreamPhysicalMatch
+            // StreamPhysicalMiniBatchAssigner
+            // StreamPhysicalExpand
+            // StreamPhysicalWindowAggregateBase
+            // StreamPhysicalGroupAggregateBase
+            // StreamPhysicalSink
+            // StreamPhysicalLegacySink
+            // StreamPhysicalCorrelateBase
+            // StreamPhysicalLookupJoin
+            // StreamPhysicalValues
+            // StreamPhysicalDataStreamScan
+            // StreamPhysicalLegacyTableSourceScan
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported to visit node %s. The node either 
should not be pushed"
+                                    + " through the changelog normalize or is 
not supported yet.",
+                            rel.getClass().getSimpleName()));
+        }
+    }
+
+    private static boolean visitTableSourceScan(
+            StreamPhysicalTableSourceScan tableScan, ImmutableBitSet 
requiredColumns) {
+        if (!(tableScan.tableSource() instanceof SupportsReadingMetadata)) {
+            // source does not have metadata, no need to check
+            return false;
+        }
+        TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);

Review Comment:
   ```suggestion
           final TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -932,10 +961,325 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
   }
 
-  // 
-------------------------------------------------------------------------------------------
+  /**
+   * A visitor which will try to satisfy the required [[DeleteKindTrait]] from 
root.
+   *
+   * <p>After traversed by this visitor, every node should have a correct 
[[DeleteKindTrait]] or
+   * returns None if the planner doesn't support to satisfy the required 
[[DeleteKindTrait]].
+   */
+  private class SatisfyDeleteKindTraitVisitor(private val context: 
StreamOptimizeContext) {
+
+    /**
+     * Try to satisfy the required [[DeleteKindTrait]] from root.
+     *
+     * <p>Each node will first require a DeleteKindTrait to its children. The 
required
+     * DeleteKindTrait may come from the node's parent, or come from the node 
itself, depending on
+     * whether the node will destroy the trait provided by children or pass 
the trait from children.
+     *
+     * <p>If the node will pass the children's DeleteKindTrait without 
destroying it, then return a
+     * new node with new inputs and forwarded DeleteKindTrait.
+     *
+     * <p>If the node will destroy the children's UpdateKindTrait, then the 
node itself needs to be
+     * converted, or a new node should be generated to satisfy the required 
trait, such as marking
+     * itself not to generate UPDATE_BEFORE, or generating a new node to 
filter UPDATE_BEFORE.
+     *
+     * @param rel
+     *   the node who should satisfy the requiredTrait
+     * @param requiredTrait
+     *   the required DeleteKindTrait
+     * @return
+     *   A converted node which satisfies required traits by input nodes of 
current node. Or None if
+     *   required traits cannot be satisfied.
+     */
+    def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): 
Option[StreamPhysicalRel] =
+      rel match {
+        case sink: StreamPhysicalSink =>
+          val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+          visitSink(sink, sinkRequiredTraits)
+
+        case sink: StreamPhysicalLegacySink[_] =>
+          val childModifyKindSet = getModifyKindSet(sink.getInput)
+          val fullDelete = fullDeleteOrNone(childModifyKindSet)
+          visitSink(sink, Seq(fullDelete))
+
+        case _: StreamPhysicalGroupAggregate | _: 
StreamPhysicalGroupTableAggregate |
+            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
+            _: StreamPhysicalPythonGroupTableAggregate | _: 
StreamPhysicalGroupWindowAggregateBase |
+            _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: 
StreamPhysicalRank |
+            _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
+            _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
+            _: StreamPhysicalWatermarkAssigner | _: 
StreamPhysicalWindowTableFunction |
+            _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
+            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
+            _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
+            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
|
+            _: StreamPhysicalProcessTableFunction =>
+          // if not explicitly supported, all operators require full deletes 
if there are updates
+          // ProcessTableFunction currently only consumes full deletes or 
insert-only
+          val children = rel.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              this.visit(child, fullDeleteOrNone(childModifyKindSet))
+          }.toList
+          createNewNode(rel, Some(children.flatten), 
fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case join: StreamPhysicalJoin =>
+          val children = join.getInputs.zipWithIndex.map {
+            case (child, childOrdinal) =>
+              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+              val supportsDeleteByKey = 
join.inputUniqueKeyContainsJoinKey(childOrdinal)
+              val inputModifyKindSet = getModifyKindSet(physicalChild)
+              if (supportsDeleteByKey && requiredTrait == DELETE_ON_KEY) {
+                this
+                  .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
+                  .orElse(this.visit(physicalChild, 
fullDeleteOrNone(inputModifyKindSet)))
+              } else {
+                this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
+              }
+          }
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val childRels = children.flatten.toList
+            if (childRels.exists(r => getDeleteKind(r) == 
DeleteKind.DELETE_ON_KEY)) {
+              createNewNode(join, Some(childRels), 
deleteOnKeyOrNone(getModifyKindSet(rel)))
+            } else {
+              createNewNode(join, Some(childRels), 
fullDeleteOrNone(getModifyKindSet(rel)))
+            }
+          }
+
+        case calc: StreamPhysicalCalcBase =>
+          if (
+            requiredTrait == DeleteKindTrait.DELETE_ON_KEY &&
+            calc.getProgram.getCondition != null
+          ) {
+            // this can be further improved by checking if the filter 
condition is on the key
+            None
+          } else {
+            // otherwise, forward DeleteKind requirement
+            visitChildren(rel, requiredTrait) match {
+              case None => None
+              case Some(children) =>
+                val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+                createNewNode(rel, Some(children), childTrait)
+            }
+          }
+
+        case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
+            _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalDropUpdateBefore =>
+          // transparent forward requiredTrait to children
+          visitChildren(rel, requiredTrait) match {
+            case None => None
+            case Some(children) =>
+              val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+              createNewNode(rel, Some(children), childTrait)
+          }
+
+        case union: StreamPhysicalUnion =>
+          val children = union.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              val requiredChildTrait = if 
(!childModifyKindSet.contains(ModifyKind.DELETE)) {
+                DeleteKindTrait.NONE
+              } else {
+                requiredTrait
+              }
+              this.visit(child, requiredChildTrait)
+          }.toList
+
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val deleteKinds = children.flatten
+              .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
+            // union can just forward changes, can't actively satisfy to 
another changelog mode
+            val providedTrait = if (deleteKinds.forall(k => 
DeleteKindTrait.NONE == k)) {
+              // if all the children is NO_DELETE, union is NO_DELETE
+              DeleteKindTrait.NONE
+            } else {
+              // otherwise, merge update kinds.
+              val merged = deleteKinds
+                .map(_.deleteKind)
+                .reduce {
+                  (l, r) =>
+                    (l, r) match {
+                      case (DeleteKind.NONE, r: DeleteKind) => r
+                      case (l: DeleteKind, DeleteKind.NONE) => l
+                      case (l: DeleteKind, r: DeleteKind) =>
+                        if (l == r) {
+                          l
+                        } else {
+                          // if any of the union input produces DELETE_ON_KEY, 
the union produces
+                          // delete on key
+                          DeleteKind.DELETE_ON_KEY
+                        }
+                    }
+                }
+              new DeleteKindTrait(merged)
+            }
+            createNewNode(union, Some(children.flatten), providedTrait)
+          }
+
+        case normalize: StreamPhysicalChangelogNormalize =>
+          // if
+          // 1. we don't need to produce UPDATE_BEFORE,
+          // 2. children can satisfy the required delete trait,
+          // 3. the normalize doesn't have filter condition which we'd lose,
+          // 4. we don't use metadata columns
+          // we can skip ChangelogNormalize
+          if (!ChangelogNormalizeRequirementResolver.isRequired(normalize)) {
+            visitChildren(normalize, requiredTrait) match {
+              case Some(children) =>
+                val input = children.head match {
+                  case exchange: StreamPhysicalExchange =>
+                    exchange.getInput
+                  case _ =>
+                    normalize.getInput
+                }
+                return Some(input.asInstanceOf[StreamPhysicalRel])
+              case _ =>
+            }
+          }
+          val childModifyKindTrait = getModifyKindSet(rel.getInput(0))
+
+          // prefer delete by key, but accept both
+          val children = visitChildren(normalize, 
deleteOnKeyOrNone(childModifyKindTrait))
+            .orElse(visitChildren(normalize, 
fullDeleteOrNone(childModifyKindTrait)))
+
+          // changelog normalize produces full deletes
+          createNewNode(rel, children, fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case ts: StreamPhysicalTableSourceScan =>
+          // currently only support BEFORE_AND_AFTER if source produces updates
+          val providedTrait = 
DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
+          createNewNode(rel, Some(List()), providedTrait)
+
+        case _: StreamPhysicalDataStreamScan | _: 
StreamPhysicalLegacyTableSourceScan |
+            _: StreamPhysicalValues =>
+          createNewNode(rel, Some(List()), DeleteKindTrait.NONE)
+
+        case _: StreamPhysicalIntermediateTableScan =>
+          createNewNode(rel, Some(List()), 
fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case _ =>
+          println(rel)

Review Comment:
   remove



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Plan tests for removal of redundant changelog normalize. */
+public class ChangelogNormalizeOptimizationTest extends TableTestBase {
+
+    private final JavaStreamTableTestUtil util = javaStreamTestUtil();
+
+    static List<TestSpec> getTests() {
+        return Arrays.asList(
+                TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.RETRACT_SINK),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA,
+                        SinkTable.UPSERT_SINK_METADATA),
+                TestSpec.selectWithoutMetadata(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA, 
SinkTable.UPSERT_SINK));
+    }
+
+    @AfterEach
+    void tearDown() {
+        Arrays.stream(util.tableEnv().listTables())
+                .forEach(t -> util.tableEnv().executeSql("DROP TABLE " + t));
+    }
+
+    @ParameterizedTest()
+    @MethodSource("getTests")
+    void testChangelogNormalizePlan(TestSpec spec) {
+        for (TableProperties tableProperties : spec.tablesToCreate) {
+            final String additonalColumns =
+                    String.join(",\n", tableProperties.getAdditionalColumns());
+            util.tableEnv()
+                    .executeSql(
+                            String.format(
+                                    "CREATE TABLE %s ( id INT,\n"
+                                            + " col1 INT,\n"
+                                            + " col2 STRING,\n"
+                                            + "%s"
+                                            + " PRIMARY KEY(id) NOT ENFORCED) 
WITH (%s)",
+                                    tableProperties.getTableName(),
+                                    
StringUtils.isNullOrWhitespaceOnly(additonalColumns)
+                                            ? ""
+                                            : additonalColumns + ",\n",
+                                    String.join(",\n", 
tableProperties.getOptions())));
+        }
+        util.verifyRelPlanInsert(
+                spec.query,
+                JavaScalaConversionUtil.toScala(
+                        
Collections.singletonList(ExplainDetail.CHANGELOG_MODE)));
+    }
+
+    interface TableProperties {
+
+        String getTableName();
+
+        List<String> getOptions();
+
+        List<String> getAdditionalColumns();
+    }
+
+    public enum SourceTable implements TableProperties {
+        UPSERT_SOURCE_PARTIAL_DELETES(
+                "upsert_table_partial_deletes",
+                "'connector' = 'values'",
+                "'changelog-mode' = 'I,UA,D'",

Review Comment:
   ```suggestion
                   "'changelog-mode' = 'UA,D'",
   ```
   Kafka does not output `I` so for a real scenario we should remove it here as 
well.



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml:
##########
@@ -0,0 +1,418 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testChangelogNormalizePlan[[10] 
select_with_filter_upsert_table_full_deletes_into_all_change_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM 
upsert_table_full_deletes WHERE col1 > 2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalFilter(condition=[>($1, 2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)], 
changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2], 
changelogMode=[I,UA,D])

Review Comment:
   ```suggestion
         +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2], 
changelogMode=[I,UA,D])
   ```
   Should the `changelogMode` indicate the partial/full deletes? Maybe with a 
`~D`? For "approx. delete"



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.trait;
+
+/** Lists all kinds of {@link ModifyKind#DELETE} operation. */
+public enum DeleteKind {
+
+    /** This kind indicates that operators do not emit {@link 
ModifyKind#DELETE} operation. */
+    NONE,
+
+    /**
+     * This kind indicates that operators can emit deletes with the key only. 
The rest of the row
+     * may be not present.
+     */
+    DELETE_ON_KEY,

Review Comment:
   ```suggestion
       DELETE_BY_KEY,
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Plan tests for removal of redundant changelog normalize. */
+public class ChangelogNormalizeOptimizationTest extends TableTestBase {
+
+    private final JavaStreamTableTestUtil util = javaStreamTestUtil();
+
+    static List<TestSpec> getTests() {
+        return Arrays.asList(
+                TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.RETRACT_SINK),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA,
+                        SinkTable.UPSERT_SINK_METADATA),
+                TestSpec.selectWithoutMetadata(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA, 
SinkTable.UPSERT_SINK));
+    }
+
+    @AfterEach
+    void tearDown() {
+        Arrays.stream(util.tableEnv().listTables())
+                .forEach(t -> util.tableEnv().executeSql("DROP TABLE " + t));
+    }
+
+    @ParameterizedTest()
+    @MethodSource("getTests")
+    void testChangelogNormalizePlan(TestSpec spec) {
+        for (TableProperties tableProperties : spec.tablesToCreate) {
+            final String additonalColumns =

Review Comment:
   ```suggestion
               final String additionalColumns =
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Checks if it is safe to remove ChangelogNormalize as part of {@link
+ * FlinkChangelogModeInferenceProgram}. It checks:
+ *
+ * <ul>
+ *   <li>if there is no filter pushed into the changelog normalize
+ *   <li>if we don't need to produce UPDATE_BEFORE
+ *   <li>we don't access any metadata columns
+ * </ul>
+ */
+public class ChangelogNormalizeRequirementResolver {
+
+    /** Checks if it is safe to remove ChangelogNormalize. */
+    public static boolean isRequired(StreamPhysicalChangelogNormalize 
normalize) {
+        if (normalize.filterCondition() != null) {
+            return true;
+        }
+        if (!Objects.equals(
+                
normalize.getTraitSet().getTrait(UpdateKindTraitDef$.MODULE$.INSTANCE()),
+                UpdateKindTrait$.MODULE$.ONLY_UPDATE_AFTER())) {
+            return true;
+        }
+
+        // check if metadata columns are accessed
+        final RelNode input = normalize.getInput();
+
+        return visit(input, bitSetForAllOutputColumns(input));
+    }
+
+    private static ImmutableBitSet bitSetForAllOutputColumns(RelNode input) {
+        return ImmutableBitSet.builder().set(0, 
input.getRowType().getFieldCount()).build();
+    }
+
+    private static boolean visit(final RelNode rel, final ImmutableBitSet 
requiredColumns) {
+        if (rel instanceof StreamPhysicalCalcBase) {
+            return visitCalc((StreamPhysicalCalcBase) rel, requiredColumns);
+        } else if (rel instanceof StreamPhysicalTableSourceScan) {
+            return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, 
requiredColumns);
+        } else if (rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalExchange) {
+            // require all input columns
+            final RelNode input = ((SingleRel) rel).getInput();
+            return visit(input, bitSetForAllOutputColumns(input));
+        } else {
+            // these nodes should not be in an input of a changelog normalize
+            // StreamPhysicalChangelogNormalize
+            // StreamPhysicalDropUpdateBefore
+            // StreamPhysicalUnion
+            // StreamPhysicalSort
+            // StreamPhysicalLimit
+            // StreamPhysicalSortLimit
+            // StreamPhysicalTemporalSort
+            // StreamPhysicalWindowTableFunction
+            // StreamPhysicalWindowRank
+            // StreamPhysicalWindowDeduplicate
+            // StreamPhysicalRank
+            // StreamPhysicalOverAggregateBase
+            // CommonPhysicalJoin
+            // StreamPhysicalMatch
+            // StreamPhysicalMiniBatchAssigner
+            // StreamPhysicalExpand
+            // StreamPhysicalWindowAggregateBase
+            // StreamPhysicalGroupAggregateBase
+            // StreamPhysicalSink
+            // StreamPhysicalLegacySink
+            // StreamPhysicalCorrelateBase
+            // StreamPhysicalLookupJoin
+            // StreamPhysicalValues
+            // StreamPhysicalDataStreamScan
+            // StreamPhysicalLegacyTableSourceScan
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported to visit node %s. The node either 
should not be pushed"
+                                    + " through the changelog normalize or is 
not supported yet.",
+                            rel.getClass().getSimpleName()));
+        }
+    }
+
+    private static boolean visitTableSourceScan(
+            StreamPhysicalTableSourceScan tableScan, ImmutableBitSet 
requiredColumns) {
+        if (!(tableScan.tableSource() instanceof SupportsReadingMetadata)) {
+            // source does not have metadata, no need to check
+            return false;
+        }
+        TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);
+        assert sourceTable != null;
+        // check if requiredColumns contain metadata column
+        final List<Column.MetadataColumn> metadataColumns =
+                DynamicSourceUtils.extractMetadataColumns(
+                        
sourceTable.contextResolvedTable().getResolvedSchema());
+        final Set<String> metaColumnSet =
+                
metadataColumns.stream().map(Column::getName).collect(Collectors.toSet());
+        final List<String> columns = tableScan.getRowType().getFieldNames();
+        for (int index = 0; index < columns.size(); index++) {
+            String column = columns.get(index);
+            if (metaColumnSet.contains(column) && requiredColumns.get(index)) {
+                // we require metadata column, therefore, we cannot remove the 
changelog normalize
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean visitCalc(StreamPhysicalCalcBase calc, 
ImmutableBitSet requiredColumns) {
+        // evaluate required columns from input
+        final List<RexNode> projects =
+                calc.getProgram().getProjectList().stream()
+                        .map(expr -> calc.getProgram().expandLocalRef(expr))
+                        .collect(Collectors.toList());
+        final Map<Integer, List<Integer>> outFromSourcePos =
+                FlinkRelUtil.extractSourceMapping(projects);
+        final List<Integer> conv2Inputs =
+                requiredColumns.toList().stream()
+                        .map(
+                                out ->
+                                        
Optional.ofNullable(outFromSourcePos.get(out))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
TableException(

Review Comment:
   ```suggestion
                                                                   new 
IllegalStateException(
   ```
   This is not really user facing but the systems problem.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -70,12 +72,33 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
 
     val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context)
-    val finalRoot = requiredUpdateKindTraits.flatMap {
+    val updateRoot = requiredUpdateKindTraits.flatMap {
       requiredUpdateKindTrait =>
         updateKindTraitVisitor.visit(rootWithModifyKindSet, 
requiredUpdateKindTrait)
     }
 
-    // step3: sanity check and return non-empty root
+    // step3: satisfy DeleteKind trait
+    val requiredDeleteKindTraits = if 
(rootModifyKindSet.contains(ModifyKind.DELETE)) {
+      root match {
+        case _: StreamPhysicalSink =>
+          // try DELETE_BY_KEY first, and then FULL_DELETE

Review Comment:
   Seems you had my suggestion above before ;-)



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.`trait`
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+
+class DeleteKindTraitDef extends RelTraitDef[DeleteKindTrait] {

Review Comment:
   Add JavaDoc



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -932,10 +961,325 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
   }
 
-  // 
-------------------------------------------------------------------------------------------
+  /**
+   * A visitor which will try to satisfy the required [[DeleteKindTrait]] from 
root.
+   *
+   * <p>After traversed by this visitor, every node should have a correct 
[[DeleteKindTrait]] or
+   * returns None if the planner doesn't support to satisfy the required 
[[DeleteKindTrait]].
+   */
+  private class SatisfyDeleteKindTraitVisitor(private val context: 
StreamOptimizeContext) {
+
+    /**
+     * Try to satisfy the required [[DeleteKindTrait]] from root.
+     *
+     * <p>Each node will first require a DeleteKindTrait to its children. The 
required
+     * DeleteKindTrait may come from the node's parent, or come from the node 
itself, depending on
+     * whether the node will destroy the trait provided by children or pass 
the trait from children.
+     *
+     * <p>If the node will pass the children's DeleteKindTrait without 
destroying it, then return a
+     * new node with new inputs and forwarded DeleteKindTrait.
+     *
+     * <p>If the node will destroy the children's UpdateKindTrait, then the 
node itself needs to be
+     * converted, or a new node should be generated to satisfy the required 
trait, such as marking
+     * itself not to generate UPDATE_BEFORE, or generating a new node to 
filter UPDATE_BEFORE.
+     *
+     * @param rel
+     *   the node who should satisfy the requiredTrait
+     * @param requiredTrait
+     *   the required DeleteKindTrait
+     * @return
+     *   A converted node which satisfies required traits by input nodes of 
current node. Or None if
+     *   required traits cannot be satisfied.
+     */
+    def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): 
Option[StreamPhysicalRel] =
+      rel match {
+        case sink: StreamPhysicalSink =>
+          val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+          visitSink(sink, sinkRequiredTraits)
+
+        case sink: StreamPhysicalLegacySink[_] =>
+          val childModifyKindSet = getModifyKindSet(sink.getInput)
+          val fullDelete = fullDeleteOrNone(childModifyKindSet)
+          visitSink(sink, Seq(fullDelete))
+
+        case _: StreamPhysicalGroupAggregate | _: 
StreamPhysicalGroupTableAggregate |
+            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
+            _: StreamPhysicalPythonGroupTableAggregate | _: 
StreamPhysicalGroupWindowAggregateBase |
+            _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: 
StreamPhysicalRank |
+            _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
+            _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
+            _: StreamPhysicalWatermarkAssigner | _: 
StreamPhysicalWindowTableFunction |
+            _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
+            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
+            _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
+            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
|
+            _: StreamPhysicalProcessTableFunction =>
+          // if not explicitly supported, all operators require full deletes 
if there are updates
+          // ProcessTableFunction currently only consumes full deletes or 
insert-only
+          val children = rel.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              this.visit(child, fullDeleteOrNone(childModifyKindSet))
+          }.toList
+          createNewNode(rel, Some(children.flatten), 
fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case join: StreamPhysicalJoin =>
+          val children = join.getInputs.zipWithIndex.map {
+            case (child, childOrdinal) =>
+              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+              val supportsDeleteByKey = 
join.inputUniqueKeyContainsJoinKey(childOrdinal)
+              val inputModifyKindSet = getModifyKindSet(physicalChild)
+              if (supportsDeleteByKey && requiredTrait == DELETE_ON_KEY) {
+                this
+                  .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
+                  .orElse(this.visit(physicalChild, 
fullDeleteOrNone(inputModifyKindSet)))
+              } else {
+                this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
+              }
+          }
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val childRels = children.flatten.toList
+            if (childRels.exists(r => getDeleteKind(r) == 
DeleteKind.DELETE_ON_KEY)) {
+              createNewNode(join, Some(childRels), 
deleteOnKeyOrNone(getModifyKindSet(rel)))
+            } else {
+              createNewNode(join, Some(childRels), 
fullDeleteOrNone(getModifyKindSet(rel)))
+            }
+          }
+
+        case calc: StreamPhysicalCalcBase =>
+          if (
+            requiredTrait == DeleteKindTrait.DELETE_ON_KEY &&
+            calc.getProgram.getCondition != null
+          ) {
+            // this can be further improved by checking if the filter 
condition is on the key
+            None
+          } else {
+            // otherwise, forward DeleteKind requirement
+            visitChildren(rel, requiredTrait) match {
+              case None => None
+              case Some(children) =>
+                val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+                createNewNode(rel, Some(children), childTrait)
+            }
+          }
+
+        case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
+            _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalDropUpdateBefore =>
+          // transparent forward requiredTrait to children
+          visitChildren(rel, requiredTrait) match {
+            case None => None
+            case Some(children) =>
+              val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+              createNewNode(rel, Some(children), childTrait)
+          }
+
+        case union: StreamPhysicalUnion =>
+          val children = union.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              val requiredChildTrait = if 
(!childModifyKindSet.contains(ModifyKind.DELETE)) {
+                DeleteKindTrait.NONE
+              } else {
+                requiredTrait
+              }
+              this.visit(child, requiredChildTrait)
+          }.toList
+
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val deleteKinds = children.flatten
+              .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
+            // union can just forward changes, can't actively satisfy to 
another changelog mode
+            val providedTrait = if (deleteKinds.forall(k => 
DeleteKindTrait.NONE == k)) {
+              // if all the children is NO_DELETE, union is NO_DELETE

Review Comment:
   ```suggestion
                 // if all the children is NONE, union is NONE
   ```
   Or to which enum value are you referring in comments?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java:
##########
@@ -577,6 +577,7 @@ void testFromAndToChangelogStreamUpsert() throws Exception {
                 tableEnv.fromChangelogStream(
                         changelogStream,
                         Schema.newBuilder().primaryKey("f0").build(),
+                        // produce partial deletes

Review Comment:
   isn't the comment incorrect? it produces full deletes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to