This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4642c1f47f40637d82225a259e3fa474a78a8586
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jun 23 17:49:35 2021 +0800

    [FLINK-23054][table] Rank update optimization should based on upsert key
---
 .../table/planner/plan/utils/RankProcessStrategy.java    | 16 ++++++++--------
 .../plan/rules/logical/CalcRankTransposeRule.scala       |  8 +++++---
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java
index 074b45d..2d4221e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.commons.lang3.StringUtils;
@@ -135,17 +134,18 @@ public interface RankProcessStrategy {
     static List<RankProcessStrategy> analyzeRankProcessStrategies(
             StreamPhysicalRel rank, ImmutableBitSet partitionKey, RelCollation 
orderKey) {
 
-        RelMetadataQuery mq = rank.getCluster().getMetadataQuery();
+        FlinkRelMetadataQuery mq = (FlinkRelMetadataQuery) 
rank.getCluster().getMetadataQuery();
         List<RelFieldCollation> fieldCollations = 
orderKey.getFieldCollations();
         boolean isUpdateStream = !ChangelogPlanUtils.inputInsertOnly(rank);
         RelNode input = rank.getInput(0);
 
         if (isUpdateStream) {
-            Set<ImmutableBitSet> uniqueKeys = mq.getUniqueKeys(input);
-            if (uniqueKeys == null
-                    || uniqueKeys.isEmpty()
-                    // unique key should contains partition key
-                    || uniqueKeys.stream().noneMatch(k -> 
k.contains(partitionKey))) {
+            Set<ImmutableBitSet> upsertKeys =
+                    mq.getUpsertKeysInKeyGroupRange(input, 
partitionKey.toArray());
+            if (upsertKeys == null
+                    || upsertKeys.isEmpty()
+                    // upsert key should contains partition key
+                    || upsertKeys.stream().noneMatch(k -> 
k.contains(partitionKey))) {
                 // and we fall back to using retract rank
                 return Collections.singletonList(RETRACT_STRATEGY);
             } else {
@@ -197,7 +197,7 @@ public interface RankProcessStrategy {
                 if (isMonotonic) {
                     // TODO: choose a set of primary key
                     return Arrays.asList(
-                            new 
UpdateFastStrategy(uniqueKeys.iterator().next().toArray()),
+                            new 
UpdateFastStrategy(upsertKeys.iterator().next().toArray()),
                             RETRACT_STRATEGY);
                 } else {
                     return Collections.singletonList(RETRACT_STRATEGY);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala
index d47469a..26b255c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.planner.plan.rules.logical
 
 import org.apache.flink.table.api.TableException
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil}
 import org.apache.flink.table.runtime.operators.rank.VariableRankRange
@@ -116,11 +117,12 @@ class CalcRankTransposeRule
   private def getKeyFields(rank: FlinkLogicalRank): Array[Int] = {
     val partitionKey = rank.partitionKey.toArray
     val orderKey = 
rank.orderKey.getFieldCollations.map(_.getFieldIndex).toArray
-    val uniqueKeys = 
rank.getCluster.getMetadataQuery.getUniqueKeys(rank.getInput)
-    val keysInUniqueKeys = if (uniqueKeys == null || uniqueKeys.isEmpty) {
+    val upsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(rank.getCluster.getMetadataQuery)
+        .getUpsertKeysInKeyGroupRange(rank.getInput, partitionKey)
+    val keysInUniqueKeys = if (upsertKeys == null || upsertKeys.isEmpty) {
       Array[Int]()
     } else {
-      uniqueKeys.flatMap(_.toArray).toArray
+      upsertKeys.flatMap(_.toArray).toArray
     }
     val rankRangeKey = rank.rankRange match {
       case v: VariableRankRange => Array(v.getRankEndIndex)

Reply via email to