This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9df9dce57e6331d4b7bfd130d7f637d9db1cebfd
Author: lincoln lee <[email protected]>
AuthorDate: Tue Sep 24 22:40:12 2024 +0800

    [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from 
FlinkRelMdColumnUniqueness
---
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 39 ++++++++++------------
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 39 +++++++++++++++-------
 2 files changed, 45 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 619d5c70940..d6340831dac 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -251,21 +251,26 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
       mq: RelMetadataQuery,
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = {
-    val input = rank.getInput
-    val rankFunColumnIndex = 
RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
-    if (rankFunColumnIndex < 0) {
-      mq.areColumnsUnique(input, columns, ignoreNulls)
+    if (RankUtil.isDeduplication(rank)) {
+      columns != null && util.Arrays.equals(columns.toArray, 
rank.partitionKey.toArray)
     } else {
-      val childColumns = columns.clear(rankFunColumnIndex)
-      val isChildColumnsUnique = mq.areColumnsUnique(input, childColumns, 
ignoreNulls)
-      if (isChildColumnsUnique != null && isChildColumnsUnique) {
-        true
+      val input = rank.getInput
+
+      val rankFunColumnIndex = 
RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
+      if (rankFunColumnIndex < 0) {
+        mq.areColumnsUnique(input, columns, ignoreNulls)
       } else {
-        rank.rankType match {
-          case RankType.ROW_NUMBER =>
-            val fields = columns.toArray
-            (rank.partitionKey.toArray :+ 
rankFunColumnIndex).forall(fields.contains(_))
-          case _ => false
+        val childColumns = columns.clear(rankFunColumnIndex)
+        val isChildColumnsUnique = mq.areColumnsUnique(input, childColumns, 
ignoreNulls)
+        if (isChildColumnsUnique != null && isChildColumnsUnique) {
+          true
+        } else {
+          rank.rankType match {
+            case RankType.ROW_NUMBER =>
+              val fields = columns.toArray
+              (rank.partitionKey.toArray :+ 
rankFunColumnIndex).forall(fields.contains(_))
+            case _ => false
+          }
         }
       }
     }
@@ -277,14 +282,6 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = mq.areColumnsUnique(rel.getInput, 
columns, ignoreNulls)
 
-  def areColumnsUnique(
-      rel: StreamPhysicalDeduplicate,
-      mq: RelMetadataQuery,
-      columns: ImmutableBitSet,
-      ignoreNulls: Boolean): JBoolean = {
-    columns != null && util.Arrays.equals(columns.toArray, rel.getUniqueKeys)
-  }
-
   def areColumnsUnique(
       rel: StreamPhysicalChangelogNormalize,
       mq: RelMetadataQuery,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 83cd4f011b3..e9a224da2d3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -666,7 +666,7 @@ class FlinkRelMdHandlerTestBase {
       new RelDataTypeFieldImpl("rk", 7, longType),
       outputRankNumber = true,
       RankProcessStrategy.UNDEFINED_STRATEGY,
-      sortOnRowtime = false
+      sortOnRowTime = false
     )
 
     (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, 
streamRank)
@@ -755,7 +755,7 @@ class FlinkRelMdHandlerTestBase {
       new RelDataTypeFieldImpl("rk", 7, longType),
       outputRankNumber = true,
       RankProcessStrategy.UNDEFINED_STRATEGY,
-      sortOnRowtime = false
+      sortOnRowTime = false
     )
 
     (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, 
streamRank)
@@ -808,7 +808,7 @@ class FlinkRelMdHandlerTestBase {
       new RelDataTypeFieldImpl("rn", 7, longType),
       outputRankNumber = true,
       RankProcessStrategy.UNDEFINED_STRATEGY,
-      sortOnRowtime = false
+      sortOnRowTime = false
     )
 
     (logicalRowNumber, flinkLogicalRowNumber, streamRowNumber)
@@ -838,6 +838,7 @@ class FlinkRelMdHandlerTestBase {
   //  select a, b, c, rowtime
   //  ROW_NUMBER() over (partition by b, c order by rowtime desc) rn from 
TemporalTable3
   // ) t where rn <= 1
+  // canbe merged into rank
   protected lazy val (streamRowTimeDeduplicateFirstRow, 
streamRowTimeDeduplicateLastRow) = {
     buildFirstRowAndLastRowDeduplicateNode(true)
   }
@@ -848,13 +849,18 @@ class FlinkRelMdHandlerTestBase {
     val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true)
     val streamExchange1 =
       new StreamPhysicalExchange(cluster, scan.getTraitSet.replace(hash1), 
scan, hash1)
-    val firstRow = new StreamPhysicalDeduplicate(
+    val firstRow = new StreamPhysicalRank(
       cluster,
       streamPhysicalTraits,
       streamExchange1,
-      Array(1),
-      isRowtime,
-      keepLastRow = false
+      ImmutableBitSet.of(1),
+      RelCollations.of(3),
+      RankType.ROW_NUMBER,
+      new ConstantRankRange(1, 1),
+      new RelDataTypeFieldImpl("rn", 7, longType),
+      outputRankNumber = false,
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowTime = false
     )
 
     val builder = typeFactory.builder()
@@ -877,13 +883,22 @@ class FlinkRelMdHandlerTestBase {
     val hash12 = FlinkRelDistribution.hash(Array(1, 2), requireStrict = true)
     val streamExchange2 =
       new BatchPhysicalExchange(cluster, scan.getTraitSet.replace(hash12), 
scan, hash12)
-    val lastRow = new StreamPhysicalDeduplicate(
+    val lastRow = new StreamPhysicalRank(
       cluster,
       streamPhysicalTraits,
       streamExchange2,
-      Array(1, 2),
-      isRowtime,
-      keepLastRow = true
+      ImmutableBitSet.of(1, 2),
+      RelCollations.of(
+        new RelFieldCollation(
+          3,
+          RelFieldCollation.Direction.DESCENDING,
+          RelFieldCollation.NullDirection.FIRST)),
+      RankType.ROW_NUMBER,
+      new ConstantRankRange(1, 1),
+      new RelDataTypeFieldImpl("rn", 7, longType),
+      outputRankNumber = false,
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowTime = false
     )
     val calcOfLastRow = new StreamPhysicalCalc(
       cluster,
@@ -966,7 +981,7 @@ class FlinkRelMdHandlerTestBase {
       new RelDataTypeFieldImpl("rk", 7, longType),
       outputRankNumber = true,
       RankProcessStrategy.UNDEFINED_STRATEGY,
-      sortOnRowtime = false
+      sortOnRowTime = false
     )
 
     (logicalRankWithVariableRange, flinkLogicalRankWithVariableRange, 
streamRankWithVariableRange)

Reply via email to