This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a134711aeca205aabfe3ce04c97e1f931c5a5a64 Author: lincoln lee <[email protected]> AuthorDate: Tue Sep 24 23:36:04 2024 +0800 [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from FlinkRelMdUpsertKeys --- .../plan/metadata/FlinkRelMdUpsertKeys.scala | 23 +++++++++++----------- .../table/planner/plan/stream/sql/RankTest.xml | 2 +- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala index 358862826a2..df151c8a607 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala @@ -24,7 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGr import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin import org.apache.flink.table.planner.plan.nodes.physical.stream._ import org.apache.flink.table.planner.plan.schema.IntermediateRelTable -import org.apache.flink.table.planner.plan.utils.FlinkRexUtil +import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil} import com.google.common.collect.ImmutableSet import org.apache.calcite.plan.hep.HepRelVertex @@ -91,12 +91,17 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { } def getUpsertKeys(rel: Rank, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { - val inputKeys = filterKeys( - FlinkRelMetadataQuery - .reuseOrCreate(mq) - .getUpsertKeys(rel.getInput), - rel.partitionKey) - FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys) + rel match { + case rank: StreamPhysicalRank if RankUtil.isDeduplication(rel) => + ImmutableSet.of(ImmutableBitSet.of(rank.partitionKey.toArray.map(Integer.valueOf).toList)) + case _ => + val inputKeys = filterKeys( + FlinkRelMetadataQuery + .reuseOrCreate(mq) + .getUpsertKeys(rel.getInput), + rel.partitionKey) + FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys) + } } def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] = @@ -104,10 +109,6 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput), ImmutableBitSet.of(rel.getCollation.getKeys)) - def getUpsertKeys(rel: StreamPhysicalDeduplicate, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { - ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList)) - } - def getUpsertKeys( rel: StreamPhysicalChangelogNormalize, mq: RelMetadataQuery): JSet[ImmutableBitSet] = { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml index 6ea1d56dc73..f6abe90d2d0 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml @@ -1176,7 +1176,7 @@ LogicalProject(c=[$0], b=[$1], d=[$2]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], select=[c, b, d]) +Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], select=[c, b, d]) +- Exchange(distribution=[hash[c, b]]) +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM_RETRACT(a) FILTER $f3 AS d]) +- Exchange(distribution=[hash[c, b]])
