This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a134711aeca205aabfe3ce04c97e1f931c5a5a64
Author: lincoln lee <[email protected]>
AuthorDate: Tue Sep 24 23:36:04 2024 +0800

    [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from 
FlinkRelMdUpsertKeys
---
 .../plan/metadata/FlinkRelMdUpsertKeys.scala       | 23 +++++++++++-----------
 .../table/planner/plan/stream/sql/RankTest.xml     |  2 +-
 2 files changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
index 358862826a2..df151c8a607 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalGr
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
 import org.apache.flink.table.planner.plan.schema.IntermediateRelTable
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil
+import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil}
 
 import com.google.common.collect.ImmutableSet
 import org.apache.calcite.plan.hep.HepRelVertex
@@ -91,12 +91,17 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   }
 
   def getUpsertKeys(rel: Rank, mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
-    val inputKeys = filterKeys(
-      FlinkRelMetadataQuery
-        .reuseOrCreate(mq)
-        .getUpsertKeys(rel.getInput),
-      rel.partitionKey)
-    FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys)
+    rel match {
+      case rank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
+        
ImmutableSet.of(ImmutableBitSet.of(rank.partitionKey.toArray.map(Integer.valueOf).toList))
+      case _ =>
+        val inputKeys = filterKeys(
+          FlinkRelMetadataQuery
+            .reuseOrCreate(mq)
+            .getUpsertKeys(rel.getInput),
+          rel.partitionKey)
+        FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys)
+    }
   }
 
   def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] =
@@ -104,10 +109,6 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput),
       ImmutableBitSet.of(rel.getCollation.getKeys))
 
-  def getUpsertKeys(rel: StreamPhysicalDeduplicate, mq: RelMetadataQuery): 
JSet[ImmutableBitSet] = {
-    
ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList))
-  }
-
   def getUpsertKeys(
       rel: StreamPhysicalChangelogNormalize,
       mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 6ea1d56dc73..f6abe90d2d0 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -1176,7 +1176,7 @@ LogicalProject(c=[$0], b=[$1], d=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], 
select=[c, b, d])
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], 
select=[c, b, d])
 +- Exchange(distribution=[hash[c, b]])
    +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM_RETRACT(a) FILTER $f3 
AS d])
       +- Exchange(distribution=[hash[c, b]])

Reply via email to