This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 00625b0019e492eab9e62d953f915c5afcc06a0a Author: lincoln lee <[email protected]> AuthorDate: Tue Sep 24 22:39:33 2024 +0800 [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from FlinkChangelogModeInferenceProgram and StreamNonDeterministicUpdatePlanVisitor --- .../StreamNonDeterministicUpdatePlanVisitor.java | 19 ------------------- .../program/FlinkChangelogModeInferenceProgram.scala | 14 +------------- 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java index 14e8b72c540..cf96ce7aab6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java @@ -34,7 +34,6 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalC import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelateBase; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDataStreamScan; -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand; @@ -184,8 +183,6 @@ public class StreamNonDeterministicUpdatePlanVisitor { return visitOverAggregate((StreamPhysicalOverAggregateBase) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalRank) { return visitRank((StreamPhysicalRank) rel, requireDeterminism); - } else if (rel instanceof StreamPhysicalDeduplicate) { - return visitDeduplicate((StreamPhysicalDeduplicate) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalWindowDeduplicate) { return visitWindowDeduplicate( (StreamPhysicalWindowDeduplicate) rel, requireDeterminism); @@ -677,22 +674,6 @@ public class StreamNonDeterministicUpdatePlanVisitor { } } - private StreamPhysicalRel visitDeduplicate( - final StreamPhysicalDeduplicate dedup, final ImmutableBitSet requireDeterminism) { - // output row type same as input and does not change output columns' order - if (inputInsertOnly(dedup)) { - // similar to rank, output is deterministic when input is insert only, so required - // determinism always be satisfied here. - return transmitDeterminismRequirement(dedup, NO_REQUIRED_DETERMINISM); - } else { - // Deduplicate always has unique key currently(exec node has null check and inner - // state only support data with keys), so only pass the left columns of required - // determinism to input. - return transmitDeterminismRequirement( - dedup, requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys()))); - } - } - private StreamPhysicalRel visitWindowDeduplicate( final StreamPhysicalWindowDeduplicate winDedup, final ImmutableBitSet requireDeterminism) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index ddebacaf842..2843fc5e8d9 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -155,18 +155,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // ignore required trait from context, because sink is the true root sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel] - case deduplicate: StreamPhysicalDeduplicate => - // deduplicate only support insert only as input - val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY) - val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) { - // only proctime first row deduplicate does not produce UPDATE changes - ModifyKindSetTrait.INSERT_ONLY - } else { - // other deduplicate produce update changes - ModifyKindSetTrait.ALL_CHANGES - } - createNewNode(deduplicate, children, providedTrait, requiredTrait, requester) - case agg: StreamPhysicalGroupAggregate => // agg support all changes in input val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES) @@ -490,7 +478,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti createNewNode(rel, children, requiredTrait) case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate | - _: StreamPhysicalDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | + _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin | _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin => // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate,
