This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4642c1f47f40637d82225a259e3fa474a78a8586 Author: JingsongLi <[email protected]> AuthorDate: Wed Jun 23 17:49:35 2021 +0800 [FLINK-23054][table] Rank update optimization should based on upsert key --- .../table/planner/plan/utils/RankProcessStrategy.java | 16 ++++++++-------- .../plan/rules/logical/CalcRankTransposeRule.scala | 8 +++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java index 074b45d..2d4221e 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RankProcessStrategy.java @@ -34,7 +34,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.sql.validate.SqlMonotonicity; import org.apache.calcite.util.ImmutableBitSet; import org.apache.commons.lang3.StringUtils; @@ -135,17 +134,18 @@ public interface RankProcessStrategy { static List<RankProcessStrategy> analyzeRankProcessStrategies( StreamPhysicalRel rank, ImmutableBitSet partitionKey, RelCollation orderKey) { - RelMetadataQuery mq = rank.getCluster().getMetadataQuery(); + FlinkRelMetadataQuery mq = (FlinkRelMetadataQuery) rank.getCluster().getMetadataQuery(); List<RelFieldCollation> fieldCollations = orderKey.getFieldCollations(); boolean isUpdateStream = !ChangelogPlanUtils.inputInsertOnly(rank); RelNode input = rank.getInput(0); if (isUpdateStream) { - Set<ImmutableBitSet> uniqueKeys = mq.getUniqueKeys(input); - if (uniqueKeys == null - || uniqueKeys.isEmpty() - // unique key should contains partition key - || uniqueKeys.stream().noneMatch(k -> k.contains(partitionKey))) { + Set<ImmutableBitSet> upsertKeys = + mq.getUpsertKeysInKeyGroupRange(input, partitionKey.toArray()); + if (upsertKeys == null + || upsertKeys.isEmpty() + // upsert key should contains partition key + || upsertKeys.stream().noneMatch(k -> k.contains(partitionKey))) { // and we fall back to using retract rank return Collections.singletonList(RETRACT_STRATEGY); } else { @@ -197,7 +197,7 @@ public interface RankProcessStrategy { if (isMonotonic) { // TODO: choose a set of primary key return Arrays.asList( - new UpdateFastStrategy(uniqueKeys.iterator().next().toArray()), + new UpdateFastStrategy(upsertKeys.iterator().next().toArray()), RETRACT_STRATEGY); } else { return Collections.singletonList(RETRACT_STRATEGY); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala index d47469a..26b255c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcRankTransposeRule.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.rules.logical import org.apache.flink.table.api.TableException +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.utils.{FlinkRexUtil, RankUtil} import org.apache.flink.table.runtime.operators.rank.VariableRankRange @@ -116,11 +117,12 @@ class CalcRankTransposeRule private def getKeyFields(rank: FlinkLogicalRank): Array[Int] = { val partitionKey = rank.partitionKey.toArray val orderKey = rank.orderKey.getFieldCollations.map(_.getFieldIndex).toArray - val uniqueKeys = rank.getCluster.getMetadataQuery.getUniqueKeys(rank.getInput) - val keysInUniqueKeys = if (uniqueKeys == null || uniqueKeys.isEmpty) { + val upsertKeys = FlinkRelMetadataQuery.reuseOrCreate(rank.getCluster.getMetadataQuery) + .getUpsertKeysInKeyGroupRange(rank.getInput, partitionKey) + val keysInUniqueKeys = if (upsertKeys == null || upsertKeys.isEmpty) { Array[Int]() } else { - uniqueKeys.flatMap(_.toArray).toArray + upsertKeys.flatMap(_.toArray).toArray } val rankRangeKey = rank.rankRange match { case v: VariableRankRange => Array(v.getRankEndIndex)
