This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1543c72455629842a9424aa11ea7641fb5d13760 Author: lincoln lee <[email protected]> AuthorDate: Tue Sep 24 10:24:42 2024 +0800 [FLINK-34702][table-planner] Avoid using StreamPhysicalDeduplicate and instead decide to perform exec node transformations in StreamPhysicalRank --- .../stream/StreamPhysicalDeduplicate.scala | 7 +- .../nodes/physical/stream/StreamPhysicalRank.scala | 71 +++++++++---- .../planner/plan/rules/FlinkStreamRuleSets.scala | 1 - .../stream/StreamPhysicalDeduplicateRule.scala | 99 ----------------- .../physical/stream/StreamPhysicalRankRule.scala | 12 +-- .../flink/table/planner/plan/utils/RankUtil.scala | 54 ++++++++-- .../nodes/exec/operator/StreamOperatorNameTest.xml | 4 +- .../physical/stream/ChangelogModeInferenceTest.xml | 2 +- .../planner/plan/stream/sql/DeduplicateTest.xml | 33 +++++- .../plan/stream/sql/NonDeterministicDagTest.xml | 105 ++++++++++++++++++ .../table/planner/plan/stream/sql/RankTest.xml | 8 +- .../plan/stream/sql/agg/GroupWindowTest.xml | 2 +- .../plan/stream/sql/agg/WindowAggregateTest.xml | 8 +- .../plan/metadata/FlinkRelMdHandlerTestBase.scala | 12 ++- .../planner/plan/stream/sql/DeduplicateTest.scala | 26 +++++ .../plan/stream/sql/NonDeterministicDagTest.scala | 118 ++++++++++++--------- .../table/planner/plan/stream/sql/RankTest.scala | 17 +-- .../plan/stream/sql/agg/WindowAggregateTest.scala | 2 +- 18 files changed, 361 insertions(+), 220 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala index d732e21d0c0..dbc5e60043d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala @@ -30,9 +30,10 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import java.util /** - * Stream physical RelNode which deduplicate on keys and keeps only first row or last row. This node - * is an optimization of [[StreamPhysicalRank]] for some special cases. Compared to - * [[StreamPhysicalRank]], this node could use mini-batch and access less state. + * TODO to be removed after FLINK-34702 is fixed. Stream physical RelNode which deduplicate on keys + * and keeps only first row or last row. This node is an optimization of [[StreamPhysicalRank]] for + * some special cases. Compared to [[StreamPhysicalRank]], this node could use mini-batch and access + * less state. */ class StreamPhysicalDeduplicate( cluster: RelOptCluster, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala index 534c9dda8d2..084d59d9893 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala @@ -21,7 +21,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.calcite.Rank import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank +import org.apache.flink.table.planner.plan.nodes.exec.stream.{StreamExecDeduplicate, StreamExecRank} import org.apache.flink.table.planner.plan.utils._ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.flink.table.runtime.operators.rank._ @@ -46,8 +46,9 @@ class StreamPhysicalRank( rankRange: RankRange, rankNumberType: RelDataTypeField, outputRankNumber: Boolean, - val rankStrategy: RankProcessStrategy) - extends Rank( + val rankStrategy: RankProcessStrategy, + val sortOnRowTime: Boolean +) extends Rank( cluster, traitSet, inputRel, @@ -59,7 +60,7 @@ class StreamPhysicalRank( outputRankNumber) with StreamPhysicalRel { - override def requireWatermark: Boolean = false + override def requireWatermark: Boolean = sortOnRowTime override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new StreamPhysicalRank( @@ -72,7 +73,9 @@ class StreamPhysicalRank( rankRange, rankNumberType, outputRankNumber, - rankStrategy) + rankStrategy, + sortOnRowTime + ) } def copy(newStrategy: RankProcessStrategy): StreamPhysicalRank = { @@ -86,7 +89,9 @@ class StreamPhysicalRank( rankRange, rankNumberType, outputRankNumber, - newStrategy) + newStrategy, + sortOnRowTime + ) } override def explainTerms(pw: RelWriter): RelWriter = { @@ -96,24 +101,50 @@ class StreamPhysicalRank( .item("rankType", rankType) .item("rankRange", rankRange.toString(inputRowType.getFieldNames)) .item("partitionBy", RelExplainUtil.fieldToString(partitionKey.toArray, inputRowType)) - .item("orderBy", RelExplainUtil.collationToString(orderKey, inputRowType)) + .item( + "orderBy", + (if (sortOnRowTime) { + "ROWTIME " + } else "") + RelExplainUtil.collationToString(orderKey, inputRowType)) .item("select", getRowType.getFieldNames.mkString(", ")) } + private def getDeduplicateDescription(isRowtime: Boolean, isLastRow: Boolean): String = { + val fieldNames = getRowType.getFieldNames + val orderString = if (isRowtime) "ROWTIME" else "PROCTIME" + val keep = if (isLastRow) "LastRow" else "FirstRow" + s"Deduplicate(keep=[$keep], key=[${partitionKey.toArray.map(fieldNames.get).mkString(", ")}], order=[$orderString])" + } + override def translateToExecNode(): ExecNode[_] = { val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) - val fieldCollations = orderKey.getFieldCollations - new StreamExecRank( - unwrapTableConfig(this), - rankType, - new PartitionSpec(partitionKey.toArray), - SortUtil.getSortSpec(fieldCollations), - rankRange, - rankStrategy, - outputRankNumber, - generateUpdateBefore, - InputProperty.DEFAULT, - FlinkTypeFactory.toLogicalRowType(getRowType), - getRelDetailedDescription) + + if (RankUtil.canConvertToDeduplicate(this)) { + val keepLastRow = RankUtil.keepLastRow(orderKey) + + new StreamExecDeduplicate( + unwrapTableConfig(this), + partitionKey.toArray, + sortOnRowTime, + keepLastRow, + generateUpdateBefore, + InputProperty.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getDeduplicateDescription(sortOnRowTime, keepLastRow)) + } else { + new StreamExecRank( + unwrapTableConfig(this), + rankType, + new PartitionSpec(partitionKey.toArray), + SortUtil.getSortSpec(orderKey.getFieldCollations), + rankRange, + rankStrategy, + outputRankNumber, + generateUpdateBefore, + InputProperty.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) + } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 50b30890333..109a982d216 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -444,7 +444,6 @@ object FlinkStreamRuleSets { StreamPhysicalTemporalSortRule.INSTANCE, // rank StreamPhysicalRankRule.INSTANCE, - StreamPhysicalDeduplicateRule.INSTANCE, // expand StreamPhysicalExpandRule.INSTANCE, // group agg diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala deleted file mode 100644 index e232a4646be..00000000000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.rules.physical.stream - -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution -import org.apache.flink.table.planner.plan.nodes.FlinkConventions -import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank -import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank} -import org.apache.flink.table.planner.plan.utils.RankUtil - -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rel.convert.ConverterRule.Config - -/** - * Rule that matches [[FlinkLogicalRank]] which is sorted by time attribute and limits 1 and its - * rank type is ROW_NUMBER, and converts it to [[StreamPhysicalDeduplicate]]. - * - * NOTES: Queries that can be converted to [[StreamPhysicalDeduplicate]] could be converted to - * [[StreamPhysicalRank]] too. [[StreamPhysicalDeduplicate]] is more efficient than - * [[StreamPhysicalRank]] due to mini-batch and less state access. - * - * e.g. - * 1. {{{ SELECT a, b, c FROM ( SELECT a, b, c, proctime, ROW_NUMBER() OVER (PARTITION BY a ORDER - * BY proctime ASC) as row_num FROM MyTable ) WHERE row_num <= 1 }}} will be converted to - * StreamExecDeduplicate which keeps first row in proctime. - * - * 2. {{{ SELECT a, b, c FROM ( SELECT a, b, c, rowtime, ROW_NUMBER() OVER (PARTITION BY a ORDER BY - * rowtime DESC) as row_num FROM MyTable ) WHERE row_num <= 1 }}} will be converted to - * StreamExecDeduplicate which keeps last row in rowtime. - */ -class StreamPhysicalDeduplicateRule(config: Config) extends ConverterRule(config) { - - override def matches(call: RelOptRuleCall): Boolean = { - val rank: FlinkLogicalRank = call.rel(0) - RankUtil.canConvertToDeduplicate(rank) - } - - override def convert(rel: RelNode): RelNode = { - val rank = rel.asInstanceOf[FlinkLogicalRank] - val requiredDistribution = if (rank.partitionKey.isEmpty) { - FlinkRelDistribution.SINGLETON - } else { - FlinkRelDistribution.hash(rank.partitionKey.toList) - } - val requiredTraitSet = rel.getCluster.getPlanner - .emptyTraitSet() - .replace(FlinkConventions.STREAM_PHYSICAL) - .replace(requiredDistribution) - val convInput: RelNode = RelOptRule.convert(rank.getInput, requiredTraitSet) - - // order by timeIndicator desc ==> lastRow, otherwise is firstRow - val fieldCollation = rank.orderKey.getFieldCollations.get(0) - val isLastRow = fieldCollation.direction.isDescending - - val fieldType = rank - .getInput() - .getRowType - .getFieldList - .get(fieldCollation.getFieldIndex) - .getType - val isRowtime = FlinkTypeFactory.isRowtimeIndicatorType(fieldType) - - val providedTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) - new StreamPhysicalDeduplicate( - rel.getCluster, - providedTraitSet, - convInput, - rank.partitionKey.toArray, - isRowtime, - isLastRow) - } -} - -object StreamPhysicalDeduplicateRule { - val INSTANCE = new StreamPhysicalDeduplicateRule( - Config.INSTANCE.withConversion( - classOf[FlinkLogicalRank], - FlinkConventions.LOGICAL, - FlinkConventions.STREAM_PHYSICAL, - "StreamPhysicalDeduplicateRule")) -} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala index 08c3616ae1f..0c878a8fdcc 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank} import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, RankUtil} -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.plan.RelOptRule import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.convert.ConverterRule.Config @@ -34,11 +34,6 @@ import org.apache.calcite.rel.convert.ConverterRule.Config */ class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) { - override def matches(call: RelOptRuleCall): Boolean = { - val rank: FlinkLogicalRank = call.rel(0) - !RankUtil.canConvertToDeduplicate(rank) - } - override def convert(rel: RelNode): RelNode = { val rank = rel.asInstanceOf[FlinkLogicalRank] val input = rank.getInput @@ -53,6 +48,8 @@ class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) { val providedTraitSet = rank.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet) + val sortOnRowTime = RankUtil.sortOnRowTime(rank.orderKey, input.getRowType) + new StreamPhysicalRank( rank.getCluster, providedTraitSet, @@ -63,7 +60,8 @@ class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) { rank.rankRange, rank.rankNumberType, rank.outputRankNumber, - RankProcessStrategy.UNDEFINED_STRATEGY) + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowTime) } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala index 55325b70f0f..d0f1efef198 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala @@ -22,8 +22,8 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.ExpressionReducer import org.apache.flink.table.planner.plan.nodes.calcite.Rank import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate -import org.apache.flink.table.runtime.operators.rank._ +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate, StreamPhysicalRank} +import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange} import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.`type`.RelDataType @@ -350,21 +350,55 @@ object RankUtil { } val inputRowType = rank.getInput.getRowType - val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation, inputRowType) + val isSortOnTimeAttribute = sortOnTimeAttributeOnly(sortCollation, inputRowType) !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType } - private def sortOnTimeAttribute( + private def sortOnTimeAttributeOnly( sortCollation: RelCollation, inputRowType: RelDataType): Boolean = { if (sortCollation.getFieldCollations.size() != 1) { - false - } else { - val firstSortField = sortCollation.getFieldCollations.get(0) - val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType - FlinkTypeFactory.isProctimeIndicatorType(fieldType) || - FlinkTypeFactory.isRowtimeIndicatorType(fieldType) + return false + } + val firstSortField = sortCollation.getFieldCollations.get(0) + val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType + FlinkTypeFactory.isProctimeIndicatorType(fieldType) || + FlinkTypeFactory.isRowtimeIndicatorType(fieldType) + } + + /** + * Checks if the given sort collation has a field collation which based on a rowtime attribute. + */ + def sortOnRowTime(sortCollation: RelCollation, inputRowType: RelDataType): Boolean = { + sortCollation.getFieldCollations.exists { + firstSortField => + val fieldType = inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType + FlinkTypeFactory.isRowtimeIndicatorType(fieldType) + } + } + + /** Whether the given rank is logically a deduplication. */ + def isDeduplication(rank: Rank): Boolean = { + !rank.outputRankNumber && rank.rankType == RankType.ROW_NUMBER && isTop1(rank.rankRange) + } + + /** Whether the given [[StreamPhysicalRank]] could be converted to [[StreamExecDeduplicate]]. */ + def canConvertToDeduplicate(rank: StreamPhysicalRank): Boolean = { + lazy val inputInsertOnly = ChangelogPlanUtils.inputInsertOnly(rank) + lazy val sortOnTimeAttributeOnly = + RankUtil.sortOnTimeAttributeOnly(rank.orderKey, rank.getInput.getRowType) + + isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly + } + + /** Determines if the given order key indicates that the last row should be kept. */ + def keepLastRow(orderKey: RelCollation): Boolean = { + // order by timeIndicator desc ==> lastRow, otherwise is firstRow + if (orderKey.getFieldCollations.size() != 1) { + return false } + val fieldCollation = orderKey.getFieldCollations.get(0) + fieldCollation.direction.isDescending } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml index 953a8b85d46..52538228c6b 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml @@ -282,7 +282,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) == Optimized Physical Plan == Calc(select=[a, b, c]) -+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME]) ++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime ASC], select=[a, b, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, c, rowtime]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) @@ -377,7 +377,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) == Optimized Physical Plan == Calc(select=[a, b, c]) -+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME]) ++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime ASC], select=[a, b, c, rowtime]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, c, rowtime]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml index 7719e16d90b..e07f6c5b402 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml @@ -321,7 +321,7 @@ Calc(select=[amount, currency, rowtime, PROCTIME_MATERIALIZE(proctime) AS procti : +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], changelogMode=[I]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, Orders, source: [CollectionTableSource(amount, currency, rowtime)]]], fields=[amount, currency, rowtime], changelogMode=[I]) +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D]) - +- Deduplicate(keep=[LastRow], key=[currency], order=[ROWTIME], changelogMode=[I,UA,D]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[currency], orderBy=[ROWTIME rowtime DESC], select=[currency, rate, rowtime], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[currency]], changelogMode=[I]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[I]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, ratesHistory, source: [CollectionTableSource(currency, rate, rowtime)]]], fields=[currency, rate, rowtime], changelogMode=[I]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml index e5a52edd290..c23a3820a60 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml @@ -16,6 +16,35 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testInvalidChangelogInput"> + <Resource name="sql"> + <![CDATA[ +SELECT * +FROM ( + SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY ts DESC) as rank_num + FROM cdc) +WHERE rank_num = 1 + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], rank_num=[$1]) ++- LogicalFilter(condition=[=($1, 1)]) + +- LogicalProject(a=[$0], rank_num=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $2 DESC NULLS LAST)]) + +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($2, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, 1 AS $1]) ++- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[ROWTIME ts DESC], select=[a, b, ts]) + +- Exchange(distribution=[hash[b]]) + +- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 5000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc]], fields=[a, b, ts]) +]]> + </Resource> + </TestCase> <TestCase name="testInvalidRowNumberConditionOnProctime"> <Resource name="sql"> <![CDATA[ @@ -65,7 +94,7 @@ LogicalProject(a=[$0], rank_num=[$1]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, 3 AS $1]) -+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=3, rankEnd=3], partitionBy=[b], orderBy=[rowtime DESC], select=[a, b, rowtime]) ++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=3, rankEnd=3], partitionBy=[b], orderBy=[ROWTIME rowtime DESC], select=[a, b, rowtime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[a, b, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -89,7 +118,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2]) +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, ts, a]) - +- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME ts DESC], select=[a, b, ts]) +- Exchange(distribution=[hash[a]]) +- WatermarkAssigner(rowtime=[ts], watermark=[ts]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, ts]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml index 646770d5f42..a0a00b51d83 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml @@ -2749,6 +2749,74 @@ Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, EXPR$1 +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, proctime, CAST(a AS BIGINT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testProctimeDedupOnCdcWithMetadataSinkWithoutPk[nonDeterministicUpdateStrategy=IGNORE]"> + <Resource name="sql"> + <![CDATA[ +insert into sink_without_pk +SELECT a, metadata_3, c +FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + FROM cdc_with_meta +) +WHERE rowNum = 1 + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c]) ++- LogicalProject(a=[$0], metadata_3=[$6], c=[$2]) + +- LogicalFilter(condition=[=($7, 1)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, metadata_3, c]) ++- Calc(select=[a, metadata_3, c]) + +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[$7 ASC], select=[a, c, metadata_3, $7]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, metadata_3, PROCTIME() AS $7]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]], fields=[a, b, c, d, metadata_1, metadata_2, metadata_3]) +]]> + </Resource> + </TestCase> + <TestCase name="testProctimeDedupOnCdcWithMetadataSinkWithPk[nonDeterministicUpdateStrategy=IGNORE]"> + <Resource name="sql"> + <![CDATA[ +insert into sink_with_pk +SELECT a, metadata_3, c +FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + FROM cdc_with_meta +) +WHERE rowNum = 1 + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, metadata_3, c]) ++- LogicalProject(a=[$0], metadata_3=[$6], c=[$2]) + +- LogicalFilter(condition=[=($7, 1)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], metadata_2=[$5], metadata_3=[$6]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, metadata_3, c]) ++- Calc(select=[a, metadata_3, c]) + +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[$7 ASC], select=[a, c, metadata_3, $7]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, metadata_3, PROCTIME() AS $7]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]], fields=[a, b, c, d, metadata_1, metadata_2, metadata_3]) ]]> </Resource> </TestCase> @@ -2879,6 +2947,43 @@ Sink(table=[default_catalog.default_database.sink_with_composite_pk], fields=[a, +- GroupAggregate(groupBy=[a], select=[a, MAX(c) AS c, SUM(b) AS cnt]) +- Exchange(distribution=[hash[a]]) +- TableSourceScan(table=[[default_catalog, default_database, src, project=[a, c, b], metadata=[]]], fields=[a, c, b]) +]]> + </Resource> + </TestCase> + <TestCase name="testRowtimeDedupOnCdcWithMetadataSinkWithPk[nonDeterministicUpdateStrategy=IGNORE]"> + <Resource name="sql"> + <![CDATA[ +insert into sink_with_pk +SELECT a, b, c +FROM ( + SELECT *, + ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum + FROM cdc_with_meta_and_wm +) +WHERE rowNum = 1 + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c]) ++- LogicalProject(a=[$0], b=[$1], c=[$2]) + +- LogicalFilter(condition=[=($5, 1)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], op_ts=[$4], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST)]) + +- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 5000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], op_ts=[CAST($4):TIMESTAMP_LTZ(3) *ROWTIME*]) + +- LogicalTableScan(table=[[default_catalog, default_database, cdc_with_meta_and_wm, metadata=[op_ts]]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c]) ++- Calc(select=[a, b, c]) + +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME op_ts ASC], select=[a, b, c, op_ts]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, op_ts]) + +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 5000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, CAST(op_ts AS TIMESTAMP_LTZ(3) *ROWTIME*) AS op_ts]) + +- TableSourceScan(table=[[default_catalog, default_database, cdc_with_meta_and_wm, metadata=[op_ts]]], fields=[a, b, c, d, op_ts]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml index ed4bc9f8583..6ea1d56dc73 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml @@ -1174,13 +1174,13 @@ LogicalProject(c=[$0], b=[$1], d=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> - <Resource name="optimized rel plan"> + <Resource name="optimized exec plan"> <![CDATA[ -Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], select=[c, b, d]) +Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], select=[c, b, d]) +- Exchange(distribution=[hash[c, b]]) - +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM(a) FILTER $f3 AS d]) + +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM_RETRACT(a) FILTER $f3 AS d]) +- Exchange(distribution=[hash[c, b]]) - +- Calc(select=[c, b, a, IS TRUE(>(a, 0)) AS $f3]) + +- Calc(select=[c, b, a, (a > 0) IS TRUE AS $f3]) +- Deduplicate(keep=[FirstRow], key=[c], order=[PROCTIME]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, b, c, PROCTIME() AS $5]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml index 53acfb61550..675d83f5f9c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml @@ -635,7 +635,7 @@ Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I]) +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D]) +- Calc(select=[rowtime], changelogMode=[I,UB,UA,D]) - +- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME], changelogMode=[I,UB,UA,D]) + +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime DESC], select=[a, rowtime], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[a]], changelogMode=[I]) +- Calc(select=[a, rowtime], changelogMode=[I]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index 75fb934413f..b34656f369f 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -2454,7 +2454,7 @@ LogicalProject(c=[$2], EXPR$1=[$4]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> - <Resource name="optimized rel plan"> + <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, EXPR$1]) +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1]) @@ -2464,7 +2464,7 @@ Calc(select=[c, EXPR$1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, proctime, window_start, window_end, window_time]) +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) ]]> @@ -2497,7 +2497,7 @@ LogicalProject(c=[$2], EXPR$1=[$4]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> - <Resource name="optimized rel plan"> + <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, EXPR$1]) +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1]) @@ -2507,7 +2507,7 @@ Calc(select=[c, EXPR$1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, proctime, window_start, window_end, window_time]) +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])]) - +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) ]]> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala index ff6eed9be7d..83cd4f011b3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala @@ -665,7 +665,8 @@ class FlinkRelMdHandlerTestBase { new ConstantRankRange(1, 5), new RelDataTypeFieldImpl("rk", 7, longType), outputRankNumber = true, - RankProcessStrategy.UNDEFINED_STRATEGY + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowtime = false ) (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank) @@ -753,7 +754,8 @@ class FlinkRelMdHandlerTestBase { new ConstantRankRange(3, 5), new RelDataTypeFieldImpl("rk", 7, longType), outputRankNumber = true, - RankProcessStrategy.UNDEFINED_STRATEGY + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowtime = false ) (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank) @@ -805,7 +807,8 @@ class FlinkRelMdHandlerTestBase { new ConstantRankRange(3, 6), new RelDataTypeFieldImpl("rn", 7, longType), outputRankNumber = true, - RankProcessStrategy.UNDEFINED_STRATEGY + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowtime = false ) (logicalRowNumber, flinkLogicalRowNumber, streamRowNumber) @@ -962,7 +965,8 @@ class FlinkRelMdHandlerTestBase { new VariableRankRange(3), new RelDataTypeFieldImpl("rk", 7, longType), outputRankNumber = true, - RankProcessStrategy.UNDEFINED_STRATEGY + RankProcessStrategy.UNDEFINED_STRATEGY, + sortOnRowtime = false ) (logicalRankWithVariableRange, flinkLogicalRankWithVariableRange, streamRankWithVariableRange) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala index f0fe449386a..2bab70f2d68 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala @@ -71,6 +71,32 @@ class DeduplicateTest extends TableTestBase { util.verifyExecPlan(sql) } + @Test + def testInvalidChangelogInput(): Unit = { + util.tableEnv.executeSql(""" + |create temporary table cdc ( + | a int, + | b bigint, + | ts timestamp_ltz(3), + | primary key (a) not enforced, + | watermark for ts as ts - interval '5' second + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB,D' + |)""".stripMargin) + val sql = + """ + |SELECT * + |FROM ( + | SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY ts DESC) as rank_num + | FROM cdc) + |WHERE rank_num = 1 + """.stripMargin + + // the input is not append-only, it will not be translate to LastRow, but Rank + util.verifyExecPlan(sql) + } + @Test def testLastRowWithWindowOnRowtime(): Unit = { util.tableEnv.getConfig diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala index e16f747dd39..fb43bd50c98 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala @@ -1625,68 +1625,80 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp @TestTemplate def testProctimeDedupOnCdcWithMetadataSinkWithPk(): Unit = { - // TODO this should be updated after StreamPhysicalDeduplicate supports consuming update - assertThatThrownBy( - () => - util.verifyExecPlanInsert( - """ - |insert into sink_with_pk - |SELECT a, metadata_3, c - |FROM ( - | SELECT *, - | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum - | FROM cdc_with_meta - |) - |WHERE rowNum = 1 - """.stripMargin)) - .hasMessageContaining( - "StreamPhysicalDeduplicate doesn't support consuming update and delete changes") - .isInstanceOf[TableException] + // now deduplicate query with updating will translate to retract rank + val callable: ThrowingCallable = () => + util.verifyExecPlanInsert( + """ + |insert into sink_with_pk + |SELECT a, metadata_3, c + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM cdc_with_meta + |) + |WHERE rowNum = 1 + """.stripMargin) + + if (tryResolve) { + assertThatThrownBy(callable) + .hasMessageContaining( + "The column(s): $7(generated by non-deterministic function: PROCTIME ) can not satisfy the determinism requirement") + .isInstanceOf[TableException] + } else { + assertThatCode(callable).doesNotThrowAnyException() + } } @TestTemplate def testProctimeDedupOnCdcWithMetadataSinkWithoutPk(): Unit = { - // TODO this should be updated after StreamPhysicalDeduplicate supports consuming update - assertThatThrownBy( - () => - util.verifyExecPlanInsert( - """ - |insert into sink_without_pk - |SELECT a, metadata_3, c - |FROM ( - | SELECT *, - | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum - | FROM cdc_with_meta - |) - |WHERE rowNum = 1 - """.stripMargin - )) - .hasMessageContaining( - "StreamPhysicalDeduplicate doesn't support consuming update and delete changes") - .isInstanceOf[TableException] + // now deduplicate query with updating will translate to retract rank + val callable: ThrowingCallable = () => + util.verifyExecPlanInsert( + """ + |insert into sink_without_pk + |SELECT a, metadata_3, c + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum + | FROM cdc_with_meta + |) + |WHERE rowNum = 1 + """.stripMargin) + if (tryResolve) { + assertThatThrownBy(callable) + .hasMessageContaining( + "The column(s): $7(generated by non-deterministic function: PROCTIME ) can not satisfy the determinism requirement") + .isInstanceOf[TableException] + } else { + assertThatCode(callable).doesNotThrowAnyException() + } } @TestTemplate def testRowtimeDedupOnCdcWithMetadataSinkWithPk(): Unit = { - // TODO this should be updated after StreamPhysicalDeduplicate supports consuming update - assertThatThrownBy( - () => - util.verifyExecPlanInsert( - """ - |insert into sink_with_pk - |SELECT a, b, c - |FROM ( - | SELECT *, - | ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum - | FROM cdc_with_meta_and_wm - |) - |WHERE rowNum = 1 - """.stripMargin - )) - .hasMessageContaining( - "StreamPhysicalDeduplicate doesn't support consuming update and delete changes") - .isInstanceOf[TableException] + // now deduplicate query with updating will translate to retract rank + val callable: ThrowingCallable = () => + util.verifyExecPlanInsert( + """ + |insert into sink_with_pk + |SELECT a, b, c + |FROM ( + | SELECT *, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum + | FROM cdc_with_meta_and_wm + |) + |WHERE rowNum = 1 + """.stripMargin) + + if (tryResolve) { + assertThatThrownBy(callable) + .hasMessageContaining( + "The metadata column(s): 'op_ts' in cdc source may cause wrong result or error on downstream operators") + .isInstanceOf[TableException] + } else { + assertThatCode(callable).doesNotThrowAnyException() + } } @TestTemplate diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala index 8cfec66a3cb..6ffe892ba85 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala @@ -842,14 +842,15 @@ class RankTest extends TableTestBase { |CREATE VIEW v1 AS |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d FROM v0 GROUP BY c, b |""".stripMargin) - util.verifyRelPlan(""" - |SELECT c, b, d - |FROM ( - | SELECT - | c, b, d, - | ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1 - |) WHERE rn < 10 - |""".stripMargin) + util.verifyExecPlan( + """ + |SELECT c, b, d + |FROM ( + | SELECT + | c, b, d, + | ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1 + |) WHERE rn < 10 + |""".stripMargin) } @Test def testUpdatableRankAfterLookupJoin(): Unit = { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index 2a1270f08c2..c16a413aff4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -1684,7 +1684,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl @TestTemplate def testProctimeWindowTVFWithDedupWhenCantMerge(): Unit = { - util.verifyRelPlan( + util.verifyExecPlan( """ |select c, count(a) |from (
