This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 00625b0019e492eab9e62d953f915c5afcc06a0a
Author: lincoln lee <[email protected]>
AuthorDate: Tue Sep 24 22:39:33 2024 +0800

    [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from 
FlinkChangelogModeInferenceProgram and StreamNonDeterministicUpdatePlanVisitor
---
 .../StreamNonDeterministicUpdatePlanVisitor.java      | 19 -------------------
 .../program/FlinkChangelogModeInferenceProgram.scala  | 14 +-------------
 2 files changed, 1 insertion(+), 32 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index 14e8b72c540..cf96ce7aab6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalC
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelateBase;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan;
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand;
@@ -184,8 +183,6 @@ public class StreamNonDeterministicUpdatePlanVisitor {
             return visitOverAggregate((StreamPhysicalOverAggregateBase) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalRank) {
             return visitRank((StreamPhysicalRank) rel, requireDeterminism);
-        } else if (rel instanceof StreamPhysicalDeduplicate) {
-            return visitDeduplicate((StreamPhysicalDeduplicate) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalWindowDeduplicate) {
             return visitWindowDeduplicate(
                     (StreamPhysicalWindowDeduplicate) rel, requireDeterminism);
@@ -677,22 +674,6 @@ public class StreamNonDeterministicUpdatePlanVisitor {
         }
     }
 
-    private StreamPhysicalRel visitDeduplicate(
-            final StreamPhysicalDeduplicate dedup, final ImmutableBitSet 
requireDeterminism) {
-        // output row type same as input and does not change output columns' 
order
-        if (inputInsertOnly(dedup)) {
-            // similar to rank, output is deterministic when input is insert 
only, so required
-            // determinism always be satisfied here.
-            return transmitDeterminismRequirement(dedup, 
NO_REQUIRED_DETERMINISM);
-        } else {
-            // Deduplicate always has unique key currently(exec node has null 
check and inner
-            // state only support data with keys), so only pass the left 
columns of required
-            // determinism to input.
-            return transmitDeterminismRequirement(
-                    dedup, 
requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys())));
-        }
-    }
-
     private StreamPhysicalRel visitWindowDeduplicate(
             final StreamPhysicalWindowDeduplicate winDedup,
             final ImmutableBitSet requireDeterminism) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index ddebacaf842..2843fc5e8d9 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -155,18 +155,6 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         // ignore required trait from context, because sink is the true root
         sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]
 
-      case deduplicate: StreamPhysicalDeduplicate =>
-        // deduplicate only support insert only as input
-        val children = visitChildren(deduplicate, 
ModifyKindSetTrait.INSERT_ONLY)
-        val providedTrait = if (!deduplicate.keepLastRow && 
!deduplicate.isRowtime) {
-          // only proctime first row deduplicate does not produce UPDATE 
changes
-          ModifyKindSetTrait.INSERT_ONLY
-        } else {
-          // other deduplicate produce update changes
-          ModifyKindSetTrait.ALL_CHANGES
-        }
-        createNewNode(deduplicate, children, providedTrait, requiredTrait, 
requester)
-
       case agg: StreamPhysicalGroupAggregate =>
         // agg support all changes in input
         val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
@@ -490,7 +478,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           createNewNode(rel, children, requiredTrait)
 
         case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
-            _: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: 
StreamPhysicalMatch |
+            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
             _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
             _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
=>
           // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, 
OverAggregate,

Reply via email to