This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e81b1089d17fe4271d8c34a7cd9d0b61072a29bb Author: lincoln lee <[email protected]> AuthorDate: Tue Sep 24 22:47:24 2024 +0800 [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from FlinkRelMdUniqueKeys --- .../plan/metadata/FlinkRelMdUniqueKeys.scala | 23 +++------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index e171925351d..492f3c6fb93 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream._ import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, TableSourceTable} import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RankUtil} import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty -import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankType} +import org.apache.flink.table.runtime.operators.rank.RankType import org.apache.flink.table.types.logical.utils.LogicalTypeCasts import com.google.common.collect.ImmutableSet @@ -290,19 +290,9 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu def getRankUniqueKeys(rel: Rank, inputKeys: JSet[ImmutableBitSet]): JSet[ImmutableBitSet] = { val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rel).getOrElse(-1) - // for Rank node that can convert to Deduplicate, unique key is partition key - val canConvertToDeduplicate: Boolean = { - val rankRange = rel.rankRange - val isRowNumberType = rel.rankType == RankType.ROW_NUMBER - val isLimit1 = rankRange match { - case rankRange: ConstantRankRange => - rankRange.getRankStart == 1 && rankRange.getRankEnd == 1 - case _ => false - } - isRowNumberType && isLimit1 - } - if (canConvertToDeduplicate) { + if (RankUtil.isDeduplication(rel)) { + // for Rank node that can convert to Deduplicate, unique key is partition key val retSet = new JHashSet[ImmutableBitSet] retSet.add(rel.partitionKey) retSet @@ -325,13 +315,6 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu def getUniqueKeys(rel: Sort, mq: RelMetadataQuery, ignoreNulls: Boolean): JSet[ImmutableBitSet] = mq.getUniqueKeys(rel.getInput, ignoreNulls) - def getUniqueKeys( - rel: StreamPhysicalDeduplicate, - mq: RelMetadataQuery, - ignoreNulls: Boolean): JSet[ImmutableBitSet] = { - ImmutableSet.of(ImmutableBitSet.of(rel.getUniqueKeys.map(Integer.valueOf).toList)) - } - def getUniqueKeys( rel: StreamPhysicalChangelogNormalize, mq: RelMetadataQuery,
