This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e81b1089d17fe4271d8c34a7cd9d0b61072a29bb
Author: lincoln lee <[email protected]>
AuthorDate: Tue Sep 24 22:47:24 2024 +0800

    [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from 
FlinkRelMdUniqueKeys
---
 .../plan/metadata/FlinkRelMdUniqueKeys.scala       | 23 +++-------------------
 1 file changed, 3 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index e171925351d..492f3c6fb93 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream._
 import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
TableSourceTable}
 import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RankUtil}
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
-import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
+import org.apache.flink.table.runtime.operators.rank.RankType
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts
 
 import com.google.common.collect.ImmutableSet
@@ -290,19 +290,9 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
 
   def getRankUniqueKeys(rel: Rank, inputKeys: JSet[ImmutableBitSet]): 
JSet[ImmutableBitSet] = {
     val rankFunColumnIndex = 
RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1)
-    // for Rank node that can convert to Deduplicate, unique key is partition 
key
-    val canConvertToDeduplicate: Boolean = {
-      val rankRange = rel.rankRange
-      val isRowNumberType = rel.rankType == RankType.ROW_NUMBER
-      val isLimit1 = rankRange match {
-        case rankRange: ConstantRankRange =>
-          rankRange.getRankStart == 1 && rankRange.getRankEnd == 1
-        case _ => false
-      }
-      isRowNumberType && isLimit1
-    }
 
-    if (canConvertToDeduplicate) {
+    if (RankUtil.isDeduplication(rel)) {
+      // for Rank node that can convert to Deduplicate, unique key is 
partition key
       val retSet = new JHashSet[ImmutableBitSet]
       retSet.add(rel.partitionKey)
       retSet
@@ -325,13 +315,6 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
   def getUniqueKeys(rel: Sort, mq: RelMetadataQuery, ignoreNulls: Boolean): 
JSet[ImmutableBitSet] =
     mq.getUniqueKeys(rel.getInput, ignoreNulls)
 
-  def getUniqueKeys(
-      rel: StreamPhysicalDeduplicate,
-      mq: RelMetadataQuery,
-      ignoreNulls: Boolean): JSet[ImmutableBitSet] = {
-    
ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList))
-  }
-
   def getUniqueKeys(
       rel: StreamPhysicalChangelogNormalize,
       mq: RelMetadataQuery,

Reply via email to