This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9df9dce57e6331d4b7bfd130d7f637d9db1cebfd Author: lincoln lee <[email protected]> AuthorDate: Tue Sep 24 22:40:12 2024 +0800 [FLINK-34702][table-planner] Remove StreamPhysicalDeduplicate from FlinkRelMdColumnUniqueness --- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 39 ++++++++++------------ .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 39 +++++++++++++++------- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index 619d5c70940..d6340831dac 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -251,21 +251,26 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata mq: RelMetadataQuery, columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = { - val input = rank.getInput - val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1) - if (rankFunColumnIndex < 0) { - mq.areColumnsUnique(input, columns, ignoreNulls) + if (RankUtil.isDeduplication(rank)) { + columns != null && util.Arrays.equals(columns.toArray, rank.partitionKey.toArray) } else { - val childColumns = columns.clear(rankFunColumnIndex) - val isChildColumnsUnique = mq.areColumnsUnique(input, childColumns, ignoreNulls) - if (isChildColumnsUnique != null && isChildColumnsUnique) { - true + val input = rank.getInput + + val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1) + if (rankFunColumnIndex < 0) { + mq.areColumnsUnique(input, columns, ignoreNulls) } else { - rank.rankType match { - case RankType.ROW_NUMBER => - val fields = columns.toArray - (rank.partitionKey.toArray :+ rankFunColumnIndex).forall(fields.contains(_)) - case _ => false + val childColumns = columns.clear(rankFunColumnIndex) + val isChildColumnsUnique = mq.areColumnsUnique(input, childColumns, ignoreNulls) + if (isChildColumnsUnique != null && isChildColumnsUnique) { + true + } else { + rank.rankType match { + case RankType.ROW_NUMBER => + val fields = columns.toArray + (rank.partitionKey.toArray :+ rankFunColumnIndex).forall(fields.contains(_)) + case _ => false + } } } } @@ -277,14 +282,6 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = mq.areColumnsUnique(rel.getInput, columns, ignoreNulls) - def areColumnsUnique( - rel: StreamPhysicalDeduplicate, - mq: RelMetadataQuery, - columns: ImmutableBitSet, - ignoreNulls: Boolean): JBoolean = { - columns != null && util.Arrays.equals(columns.toArray, rel.getUniqueKeys) - } - def areColumnsUnique( rel: StreamPhysicalChangelogNormalize, mq: RelMetadataQuery, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index 83cd4f011b3..e9a224da2d3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -666,7 +666,7 @@ class FlinkRelMdHandlerTestBase { new RelDataTypeFieldImpl("rk", 7, longType), outputRankNumber = true, RankProcessStrategy.UNDEFINED_STRATEGY, - sortOnRowtime = false + sortOnRowTime = false ) (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank) @@ -755,7 +755,7 @@ class FlinkRelMdHandlerTestBase { new RelDataTypeFieldImpl("rk", 7, longType), outputRankNumber = true, RankProcessStrategy.UNDEFINED_STRATEGY, - sortOnRowtime = false + sortOnRowTime = false ) (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank) @@ -808,7 +808,7 @@ class FlinkRelMdHandlerTestBase { new RelDataTypeFieldImpl("rn", 7, longType), outputRankNumber = true, RankProcessStrategy.UNDEFINED_STRATEGY, - sortOnRowtime = false + sortOnRowTime = false ) (logicalRowNumber, flinkLogicalRowNumber, streamRowNumber) @@ -838,6 +838,7 @@ class FlinkRelMdHandlerTestBase { // select a, b, c, rowtime // ROW_NUMBER() over (partition by b, c order by rowtime desc) rn from TemporalTable3 // ) t where rn <= 1 + // canbe merged into rank protected lazy val (streamRowTimeDeduplicateFirstRow, streamRowTimeDeduplicateLastRow) = { buildFirstRowAndLastRowDeduplicateNode(true) } @@ -848,13 +849,18 @@ class FlinkRelMdHandlerTestBase { val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true) val streamExchange1 = new StreamPhysicalExchange(cluster, scan.getTraitSet.replace(hash1), scan, hash1) - val firstRow = new StreamPhysicalDeduplicate( + val firstRow = new StreamPhysicalRank( cluster, streamPhysicalTraits, streamExchange1, - Array(1), - isRowtime, - keepLastRow = false + ImmutableBitSet.of(1), + RelCollations.of(3), + RankType.ROW_NUMBER, + new ConstantRankRange(1, 1), + new RelDataTypeFieldImpl("rn", 7, longType), + outputRankNumber = false, + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowTime = false ) val builder = typeFactory.builder() @@ -877,13 +883,22 @@ class FlinkRelMdHandlerTestBase { val hash12 = FlinkRelDistribution.hash(Array(1, 2), requireStrict = true) val streamExchange2 = new BatchPhysicalExchange(cluster, scan.getTraitSet.replace(hash12), scan, hash12) - val lastRow = new StreamPhysicalDeduplicate( + val lastRow = new StreamPhysicalRank( cluster, streamPhysicalTraits, streamExchange2, - Array(1, 2), - isRowtime, - keepLastRow = true + ImmutableBitSet.of(1, 2), + RelCollations.of( + new RelFieldCollation( + 3, + RelFieldCollation.Direction.DESCENDING, + RelFieldCollation.NullDirection.FIRST)), + RankType.ROW_NUMBER, + new ConstantRankRange(1, 1), + new RelDataTypeFieldImpl("rn", 7, longType), + outputRankNumber = false, + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowTime = false ) val calcOfLastRow = new StreamPhysicalCalc( cluster, @@ -966,7 +981,7 @@ class FlinkRelMdHandlerTestBase { new RelDataTypeFieldImpl("rk", 7, longType), outputRankNumber = true, RankProcessStrategy.UNDEFINED_STRATEGY, - sortOnRowtime = false + sortOnRowTime = false ) (logicalRankWithVariableRange, flinkLogicalRankWithVariableRange, streamRankWithVariableRange)
