This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1543c72455629842a9424aa11ea7641fb5d13760
Author: lincoln lee <[email protected]>
AuthorDate: Tue Sep 24 10:24:42 2024 +0800

    [FLINK-34702][table-planner] Avoid using StreamPhysicalDeduplicate and 
instead decide to perform exec node transformations in StreamPhysicalRank
---
 .../stream/StreamPhysicalDeduplicate.scala         |   7 +-
 .../nodes/physical/stream/StreamPhysicalRank.scala |  71 +++++++++----
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   1 -
 .../stream/StreamPhysicalDeduplicateRule.scala     |  99 -----------------
 .../physical/stream/StreamPhysicalRankRule.scala   |  12 +--
 .../flink/table/planner/plan/utils/RankUtil.scala  |  54 ++++++++--
 .../nodes/exec/operator/StreamOperatorNameTest.xml |   4 +-
 .../physical/stream/ChangelogModeInferenceTest.xml |   2 +-
 .../planner/plan/stream/sql/DeduplicateTest.xml    |  33 +++++-
 .../plan/stream/sql/NonDeterministicDagTest.xml    | 105 ++++++++++++++++++
 .../table/planner/plan/stream/sql/RankTest.xml     |   8 +-
 .../plan/stream/sql/agg/GroupWindowTest.xml        |   2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    |   8 +-
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  12 ++-
 .../planner/plan/stream/sql/DeduplicateTest.scala  |  26 +++++
 .../plan/stream/sql/NonDeterministicDagTest.scala  | 118 ++++++++++++---------
 .../table/planner/plan/stream/sql/RankTest.scala   |  17 +--
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |   2 +-
 18 files changed, 361 insertions(+), 220 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
index d732e21d0c0..dbc5e60043d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeduplicate.scala
@@ -30,9 +30,10 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import java.util
 
 /**
- * Stream physical RelNode which deduplicate on keys and keeps only first row 
or last row. This node
- * is an optimization of [[StreamPhysicalRank]] for some special cases. 
Compared to
- * [[StreamPhysicalRank]], this node could use mini-batch and access less 
state.
+ * TODO to be removed after FLINK-34702 is fixed. Stream physical RelNode 
which deduplicate on keys
+ * and keeps only first row or last row. This node is an optimization of 
[[StreamPhysicalRank]] for
+ * some special cases. Compared to [[StreamPhysicalRank]], this node could use 
mini-batch and access
+ * less state.
  */
 class StreamPhysicalDeduplicate(
     cluster: RelOptCluster,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
index 534c9dda8d2..084d59d9893 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalRank.scala
@@ -21,7 +21,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.calcite.Rank
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.spec.PartitionSpec
-import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.{StreamExecDeduplicate, 
StreamExecRank}
 import org.apache.flink.table.planner.plan.utils._
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 import org.apache.flink.table.runtime.operators.rank._
@@ -46,8 +46,9 @@ class StreamPhysicalRank(
     rankRange: RankRange,
     rankNumberType: RelDataTypeField,
     outputRankNumber: Boolean,
-    val rankStrategy: RankProcessStrategy)
-  extends Rank(
+    val rankStrategy: RankProcessStrategy,
+    val sortOnRowTime: Boolean
+) extends Rank(
     cluster,
     traitSet,
     inputRel,
@@ -59,7 +60,7 @@ class StreamPhysicalRank(
     outputRankNumber)
   with StreamPhysicalRel {
 
-  override def requireWatermark: Boolean = false
+  override def requireWatermark: Boolean = sortOnRowTime
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new StreamPhysicalRank(
@@ -72,7 +73,9 @@ class StreamPhysicalRank(
       rankRange,
       rankNumberType,
       outputRankNumber,
-      rankStrategy)
+      rankStrategy,
+      sortOnRowTime
+    )
   }
 
   def copy(newStrategy: RankProcessStrategy): StreamPhysicalRank = {
@@ -86,7 +89,9 @@ class StreamPhysicalRank(
       rankRange,
       rankNumberType,
       outputRankNumber,
-      newStrategy)
+      newStrategy,
+      sortOnRowTime
+    )
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -96,24 +101,50 @@ class StreamPhysicalRank(
       .item("rankType", rankType)
       .item("rankRange", rankRange.toString(inputRowType.getFieldNames))
       .item("partitionBy", RelExplainUtil.fieldToString(partitionKey.toArray, 
inputRowType))
-      .item("orderBy", RelExplainUtil.collationToString(orderKey, 
inputRowType))
+      .item(
+        "orderBy",
+        (if (sortOnRowTime) {
+           "ROWTIME "
+         } else "") + RelExplainUtil.collationToString(orderKey, inputRowType))
       .item("select", getRowType.getFieldNames.mkString(", "))
   }
 
+  private def getDeduplicateDescription(isRowtime: Boolean, isLastRow: 
Boolean): String = {
+    val fieldNames = getRowType.getFieldNames
+    val orderString = if (isRowtime) "ROWTIME" else "PROCTIME"
+    val keep = if (isLastRow) "LastRow" else "FirstRow"
+    s"Deduplicate(keep=[$keep], 
key=[${partitionKey.toArray.map(fieldNames.get).mkString(", ")}], 
order=[$orderString])"
+  }
+
   override def translateToExecNode(): ExecNode[_] = {
     val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
-    val fieldCollations = orderKey.getFieldCollations
-    new StreamExecRank(
-      unwrapTableConfig(this),
-      rankType,
-      new PartitionSpec(partitionKey.toArray),
-      SortUtil.getSortSpec(fieldCollations),
-      rankRange,
-      rankStrategy,
-      outputRankNumber,
-      generateUpdateBefore,
-      InputProperty.DEFAULT,
-      FlinkTypeFactory.toLogicalRowType(getRowType),
-      getRelDetailedDescription)
+
+    if (RankUtil.canConvertToDeduplicate(this)) {
+      val keepLastRow = RankUtil.keepLastRow(orderKey)
+
+      new StreamExecDeduplicate(
+        unwrapTableConfig(this),
+        partitionKey.toArray,
+        sortOnRowTime,
+        keepLastRow,
+        generateUpdateBefore,
+        InputProperty.DEFAULT,
+        FlinkTypeFactory.toLogicalRowType(getRowType),
+        getDeduplicateDescription(sortOnRowTime, keepLastRow))
+    } else {
+      new StreamExecRank(
+        unwrapTableConfig(this),
+        rankType,
+        new PartitionSpec(partitionKey.toArray),
+        SortUtil.getSortSpec(orderKey.getFieldCollations),
+        rankRange,
+        rankStrategy,
+        outputRankNumber,
+        generateUpdateBefore,
+        InputProperty.DEFAULT,
+        FlinkTypeFactory.toLogicalRowType(getRowType),
+        getRelDetailedDescription
+      )
+    }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 50b30890333..109a982d216 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -444,7 +444,6 @@ object FlinkStreamRuleSets {
     StreamPhysicalTemporalSortRule.INSTANCE,
     // rank
     StreamPhysicalRankRule.INSTANCE,
-    StreamPhysicalDeduplicateRule.INSTANCE,
     // expand
     StreamPhysicalExpandRule.INSTANCE,
     // group agg
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
deleted file mode 100644
index e232a4646be..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalDeduplicateRule.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate,
 StreamPhysicalRank}
-import org.apache.flink.table.planner.plan.utils.RankUtil
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.convert.ConverterRule.Config
-
-/**
- * Rule that matches [[FlinkLogicalRank]] which is sorted by time attribute 
and limits 1 and its
- * rank type is ROW_NUMBER, and converts it to [[StreamPhysicalDeduplicate]].
- *
- * NOTES: Queries that can be converted to [[StreamPhysicalDeduplicate]] could 
be converted to
- * [[StreamPhysicalRank]] too. [[StreamPhysicalDeduplicate]] is more efficient 
than
- * [[StreamPhysicalRank]] due to mini-batch and less state access.
- *
- * e.g.
- *   1. {{{ SELECT a, b, c FROM ( SELECT a, b, c, proctime, ROW_NUMBER() OVER 
(PARTITION BY a ORDER
- *      BY proctime ASC) as row_num FROM MyTable ) WHERE row_num <= 1 }}} will 
be converted to
- *      StreamExecDeduplicate which keeps first row in proctime.
- *
- * 2. {{{ SELECT a, b, c FROM ( SELECT a, b, c, rowtime, ROW_NUMBER() OVER 
(PARTITION BY a ORDER BY
- * rowtime DESC) as row_num FROM MyTable ) WHERE row_num <= 1 }}} will be 
converted to
- * StreamExecDeduplicate which keeps last row in rowtime.
- */
-class StreamPhysicalDeduplicateRule(config: Config) extends 
ConverterRule(config) {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val rank: FlinkLogicalRank = call.rel(0)
-    RankUtil.canConvertToDeduplicate(rank)
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val rank = rel.asInstanceOf[FlinkLogicalRank]
-    val requiredDistribution = if (rank.partitionKey.isEmpty) {
-      FlinkRelDistribution.SINGLETON
-    } else {
-      FlinkRelDistribution.hash(rank.partitionKey.toList)
-    }
-    val requiredTraitSet = rel.getCluster.getPlanner
-      .emptyTraitSet()
-      .replace(FlinkConventions.STREAM_PHYSICAL)
-      .replace(requiredDistribution)
-    val convInput: RelNode = RelOptRule.convert(rank.getInput, 
requiredTraitSet)
-
-    // order by timeIndicator desc ==> lastRow, otherwise is firstRow
-    val fieldCollation = rank.orderKey.getFieldCollations.get(0)
-    val isLastRow = fieldCollation.direction.isDescending
-
-    val fieldType = rank
-      .getInput()
-      .getRowType
-      .getFieldList
-      .get(fieldCollation.getFieldIndex)
-      .getType
-    val isRowtime = FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
-
-    val providedTraitSet = 
rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    new StreamPhysicalDeduplicate(
-      rel.getCluster,
-      providedTraitSet,
-      convInput,
-      rank.partitionKey.toArray,
-      isRowtime,
-      isLastRow)
-  }
-}
-
-object StreamPhysicalDeduplicateRule {
-  val INSTANCE = new StreamPhysicalDeduplicateRule(
-    Config.INSTANCE.withConversion(
-      classOf[FlinkLogicalRank],
-      FlinkConventions.LOGICAL,
-      FlinkConventions.STREAM_PHYSICAL,
-      "StreamPhysicalDeduplicateRule"))
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
index 08c3616ae1f..0c878a8fdcc 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalRankRule.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate,
 StreamPhysicalRank}
 import org.apache.flink.table.planner.plan.utils.{RankProcessStrategy, 
RankUtil}
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.convert.ConverterRule.Config
@@ -34,11 +34,6 @@ import org.apache.calcite.rel.convert.ConverterRule.Config
  */
 class StreamPhysicalRankRule(config: Config) extends ConverterRule(config) {
 
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val rank: FlinkLogicalRank = call.rel(0)
-    !RankUtil.canConvertToDeduplicate(rank)
-  }
-
   override def convert(rel: RelNode): RelNode = {
     val rank = rel.asInstanceOf[FlinkLogicalRank]
     val input = rank.getInput
@@ -53,6 +48,8 @@ class StreamPhysicalRankRule(config: Config) extends 
ConverterRule(config) {
     val providedTraitSet = 
rank.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
     val newInput: RelNode = RelOptRule.convert(input, requiredTraitSet)
 
+    val sortOnRowTime = RankUtil.sortOnRowTime(rank.orderKey, input.getRowType)
+
     new StreamPhysicalRank(
       rank.getCluster,
       providedTraitSet,
@@ -63,7 +60,8 @@ class StreamPhysicalRankRule(config: Config) extends 
ConverterRule(config) {
       rank.rankRange,
       rank.rankNumberType,
       rank.outputRankNumber,
-      RankProcessStrategy.UNDEFINED_STRATEGY)
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowTime)
   }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
index 55325b70f0f..d0f1efef198 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala
@@ -22,8 +22,8 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.ExpressionReducer
 import org.apache.flink.table.planner.plan.nodes.calcite.Rank
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeduplicate
-import org.apache.flink.table.runtime.operators.rank._
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDeduplicate,
 StreamPhysicalRank}
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
ConstantRankRangeWithoutEnd, RankRange, RankType, VariableRankRange}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.`type`.RelDataType
@@ -350,21 +350,55 @@ object RankUtil {
     }
 
     val inputRowType = rank.getInput.getRowType
-    val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation, 
inputRowType)
+    val isSortOnTimeAttribute = sortOnTimeAttributeOnly(sortCollation, 
inputRowType)
 
     !rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && 
isRowNumberType
   }
 
-  private def sortOnTimeAttribute(
+  private def sortOnTimeAttributeOnly(
       sortCollation: RelCollation,
       inputRowType: RelDataType): Boolean = {
     if (sortCollation.getFieldCollations.size() != 1) {
-      false
-    } else {
-      val firstSortField = sortCollation.getFieldCollations.get(0)
-      val fieldType = 
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
-      FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
-      FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+      return false
+    }
+    val firstSortField = sortCollation.getFieldCollations.get(0)
+    val fieldType = 
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
+    FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
+    FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+  }
+
+  /**
+   * Checks if the given sort collation has a field collation which based on a 
rowtime attribute.
+   */
+  def sortOnRowTime(sortCollation: RelCollation, inputRowType: RelDataType): 
Boolean = {
+    sortCollation.getFieldCollations.exists {
+      firstSortField =>
+        val fieldType = 
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
+        FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+    }
+  }
+
+  /** Whether the given rank is logically a deduplication. */
+  def isDeduplication(rank: Rank): Boolean = {
+    !rank.outputRankNumber && rank.rankType == RankType.ROW_NUMBER && 
isTop1(rank.rankRange)
+  }
+
+  /** Whether the given [[StreamPhysicalRank]] could be converted to 
[[StreamExecDeduplicate]]. */
+  def canConvertToDeduplicate(rank: StreamPhysicalRank): Boolean = {
+    lazy val inputInsertOnly = ChangelogPlanUtils.inputInsertOnly(rank)
+    lazy val sortOnTimeAttributeOnly =
+      RankUtil.sortOnTimeAttributeOnly(rank.orderKey, rank.getInput.getRowType)
+
+    isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly
+  }
+
+  /** Determines if the given order key indicates that the last row should be 
kept. */
+  def keepLastRow(orderKey: RelCollation): Boolean = {
+    // order by timeIndicator desc ==> lastRow, otherwise is firstRow
+    if (orderKey.getFieldCollations.size() != 1) {
+      return false
     }
+    val fieldCollation = orderKey.getFieldCollations.get(0)
+    fieldCollation.direction.isDescending
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
index 953a8b85d46..52538228c6b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/StreamOperatorNameTest.xml
@@ -282,7 +282,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
 
 == Optimized Physical Plan ==
 Calc(select=[a, b, c])
-+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime 
ASC], select=[a, b, c, rowtime])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, b, c, rowtime])
          +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
@@ -377,7 +377,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
 
 == Optimized Physical Plan ==
 Calc(select=[a, b, c])
-+- Deduplicate(keep=[FirstRow], key=[a], order=[ROWTIME])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime 
ASC], select=[a, b, c, rowtime])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, b, c, rowtime])
          +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 7719e16d90b..e07f6c5b402 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -321,7 +321,7 @@ Calc(select=[amount, currency, rowtime, 
PROCTIME_MATERIALIZE(proctime) AS procti
    :     +- Calc(select=[amount, currency, rowtime, PROCTIME() AS proctime], 
changelogMode=[I])
    :        +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, Orders, source: [CollectionTableSource(amount, currency, 
rowtime)]]], fields=[amount, currency, rowtime], changelogMode=[I])
    +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
-      +- Deduplicate(keep=[LastRow], key=[currency], order=[ROWTIME], 
changelogMode=[I,UA,D])
+      +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[currency], orderBy=[ROWTIME 
rowtime DESC], select=[currency, rate, rowtime], changelogMode=[I,UA,D])
          +- Exchange(distribution=[hash[currency]], changelogMode=[I])
             +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[I])
                +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, ratesHistory, source: [CollectionTableSource(currency, rate, 
rowtime)]]], fields=[currency, rate, rowtime], changelogMode=[I])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
index e5a52edd290..c23a3820a60 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
@@ -16,6 +16,35 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testInvalidChangelogInput">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM (
+  SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY ts DESC) as rank_num
+  FROM cdc)
+WHERE rank_num = 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], rank_num=[$1])
++- LogicalFilter(condition=[=($1, 1)])
+   +- LogicalProject(a=[$0], rank_num=[ROW_NUMBER() OVER (PARTITION BY $1 
ORDER BY $2 DESC NULLS LAST)])
+      +- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($2, 5000:INTERVAL 
SECOND)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, cdc]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, 1 AS $1])
++- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[b], orderBy=[ROWTIME ts DESC], 
select=[a, b, ts])
+   +- Exchange(distribution=[hash[b]])
+      +- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 5000:INTERVAL 
SECOND)])
+         +- TableSourceScan(table=[[default_catalog, default_database, cdc]], 
fields=[a, b, ts])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testInvalidRowNumberConditionOnProctime">
     <Resource name="sql">
       <![CDATA[
@@ -65,7 +94,7 @@ LogicalProject(a=[$0], rank_num=[$1])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, 3 AS $1])
-+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=3, rankEnd=3], partitionBy=[b], orderBy=[rowtime DESC], 
select=[a, b, rowtime])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=3, rankEnd=3], partitionBy=[b], orderBy=[ROWTIME rowtime 
DESC], select=[a, b, rowtime])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[a, b, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -89,7 +118,7 @@ Calc(select=[b, EXPR$1, w$start AS EXPR$2])
 +- GroupWindowAggregate(groupBy=[b], window=[TumblingGroupWindow('w$, ts, 4)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[b, SUM(a) AS 
EXPR$1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[b, ts, a])
-         +- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME])
+         +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME ts DESC], 
select=[a, b, ts])
             +- Exchange(distribution=[hash[a]])
                +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
                   +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T, source: [CollectionTableSource(a, b, ts)]]], fields=[a, b, 
ts])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
index 646770d5f42..a0a00b51d83 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.xml
@@ -2749,6 +2749,74 @@ 
Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, EXPR$1
       +- Exchange(distribution=[hash[a]])
          +- Calc(select=[a, b, proctime, CAST(a AS BIGINT) AS $3])
             +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeDedupOnCdcWithMetadataSinkWithoutPk[nonDeterministicUpdateStrategy=IGNORE]">
+    <Resource name="sql">
+      <![CDATA[
+insert into sink_without_pk
+SELECT a, metadata_3, c
+FROM (
+  SELECT *,
+    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
+  FROM cdc_with_meta
+)
+WHERE rowNum = 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink_without_pk], 
fields=[a, metadata_3, c])
++- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+   +- LogicalFilter(condition=[=($7, 1)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], 
metadata_2=[$5], metadata_3=[$6], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 
ORDER BY PROCTIME() NULLS FIRST)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], 
metadata_2=[$5], metadata_3=[$6])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink_without_pk], fields=[a, 
metadata_3, c])
++- Calc(select=[a, metadata_3, c])
+   +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[$7 ASC], 
select=[a, c, metadata_3, $7])
+      +- Exchange(distribution=[hash[a]])
+         +- Calc(select=[a, c, metadata_3, PROCTIME() AS $7])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]], fields=[a, b, 
c, d, metadata_1, metadata_2, metadata_3])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testProctimeDedupOnCdcWithMetadataSinkWithPk[nonDeterministicUpdateStrategy=IGNORE]">
+    <Resource name="sql">
+      <![CDATA[
+insert into sink_with_pk
+SELECT a, metadata_3, c
+FROM (
+  SELECT *,
+    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum
+  FROM cdc_with_meta
+)
+WHERE rowNum = 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
metadata_3, c])
++- LogicalProject(a=[$0], metadata_3=[$6], c=[$2])
+   +- LogicalFilter(condition=[=($7, 1)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], 
metadata_2=[$5], metadata_3=[$6], rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 
ORDER BY PROCTIME() NULLS FIRST)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], metadata_1=[$4], 
metadata_2=[$5], metadata_3=[$6])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
metadata_3, c])
++- Calc(select=[a, metadata_3, c])
+   +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[$7 ASC], 
select=[a, c, metadata_3, $7])
+      +- Exchange(distribution=[hash[a]])
+         +- Calc(select=[a, c, metadata_3, PROCTIME() AS $7])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
cdc_with_meta, metadata=[metadata_1, metadata_2, metadata_3]]], fields=[a, b, 
c, d, metadata_1, metadata_2, metadata_3])
 ]]>
     </Resource>
   </TestCase>
@@ -2879,6 +2947,43 @@ 
Sink(table=[default_catalog.default_database.sink_with_composite_pk], fields=[a,
          +- GroupAggregate(groupBy=[a], select=[a, MAX(c) AS c, SUM(b) AS cnt])
             +- Exchange(distribution=[hash[a]])
                +- TableSourceScan(table=[[default_catalog, default_database, 
src, project=[a, c, b], metadata=[]]], fields=[a, c, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testRowtimeDedupOnCdcWithMetadataSinkWithPk[nonDeterministicUpdateStrategy=IGNORE]">
+    <Resource name="sql">
+      <![CDATA[
+insert into sink_with_pk
+SELECT a, b, c
+FROM (
+  SELECT *,
+    ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum
+  FROM cdc_with_meta_and_wm
+)
+WHERE rowNum = 1
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink_with_pk], fields=[a, 
b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[=($5, 1)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], op_ts=[$4], 
rowNum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $4 NULLS FIRST)])
+         +- LogicalWatermarkAssigner(rowtime=[op_ts], watermark=[-($4, 
5000:INTERVAL SECOND)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], 
op_ts=[CAST($4):TIMESTAMP_LTZ(3) *ROWTIME*])
+               +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_with_meta_and_wm, metadata=[op_ts]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink_with_pk], fields=[a, b, c])
++- Calc(select=[a, b, c])
+   +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME op_ts 
ASC], select=[a, b, c, op_ts])
+      +- Exchange(distribution=[hash[a]])
+         +- Calc(select=[a, b, c, op_ts])
+            +- WatermarkAssigner(rowtime=[op_ts], watermark=[(op_ts - 
5000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, d, CAST(op_ts AS TIMESTAMP_LTZ(3) 
*ROWTIME*) AS op_ts])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, cdc_with_meta_and_wm, metadata=[op_ts]]], fields=[a, b, c, d, 
op_ts])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index ed4bc9f8583..6ea1d56dc73 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -1174,13 +1174,13 @@ LogicalProject(c=[$0], b=[$1], d=[$2])
                      +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
 ]]>
     </Resource>
-    <Resource name="optimized rel plan">
+    <Resource name="optimized exec plan">
       <![CDATA[
-Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], 
select=[c, b, d])
+Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=9], partitionBy=[c, b], orderBy=[d DESC], 
select=[c, b, d])
 +- Exchange(distribution=[hash[c, b]])
-   +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM(a) FILTER $f3 AS d])
+   +- GroupAggregate(groupBy=[c, b], select=[c, b, SUM_RETRACT(a) FILTER $f3 
AS d])
       +- Exchange(distribution=[hash[c, b]])
-         +- Calc(select=[c, b, a, IS TRUE(>(a, 0)) AS $f3])
+         +- Calc(select=[c, b, a, (a > 0) IS TRUE AS $f3])
             +- Deduplicate(keep=[FirstRow], key=[c], order=[PROCTIME])
                +- Exchange(distribution=[hash[c]])
                   +- Calc(select=[a, b, c, PROCTIME() AS $5])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
index 53acfb61550..675d83f5f9c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/GroupWindowTest.xml
@@ -635,7 +635,7 @@ Calc(select=[w$start AS EXPR$0, cnt], changelogMode=[I])
 +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 1000)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS cnt, 
start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, 
proctime('w$) AS w$proctime], changelogMode=[I])
    +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D])
       +- Calc(select=[rowtime], changelogMode=[I,UB,UA,D])
-         +- Deduplicate(keep=[LastRow], key=[a], order=[ROWTIME], 
changelogMode=[I,UB,UA,D])
+         +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[ROWTIME rowtime 
DESC], select=[a, rowtime], changelogMode=[I,UB,UA,D])
             +- Exchange(distribution=[hash[a]], changelogMode=[I])
                +- Calc(select=[a, rowtime], changelogMode=[I])
                   +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index 75fb934413f..b34656f369f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2454,7 +2454,7 @@ LogicalProject(c=[$2], EXPR$1=[$4])
                         +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
 ]]>
     </Resource>
-    <Resource name="optimized rel plan">
+    <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, EXPR$1])
 +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
@@ -2464,7 +2464,7 @@ Calc(select=[c, EXPR$1])
             +- Exchange(distribution=[hash[c]])
                +- Calc(select=[a, c, proctime, window_start, window_end, 
window_time])
                   +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
-                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[(rowtime - 1000:INTERVAL SECOND)])
                         +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
                            +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
 ]]>
@@ -2497,7 +2497,7 @@ LogicalProject(c=[$2], EXPR$1=[$4])
                         +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
 ]]>
     </Resource>
-    <Resource name="optimized rel plan">
+    <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, EXPR$1])
 +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, COUNT_RETRACT(a) AS EXPR$1])
@@ -2507,7 +2507,7 @@ Calc(select=[c, EXPR$1])
             +- Exchange(distribution=[hash[c]])
                +- Calc(select=[a, c, proctime, window_start, window_end, 
window_time])
                   +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
-                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[(rowtime - 1000:INTERVAL SECOND)])
                         +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS 
proctime])
                            +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index ff6eed9be7d..83cd4f011b3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -665,7 +665,8 @@ class FlinkRelMdHandlerTestBase {
       new ConstantRankRange(1, 5),
       new RelDataTypeFieldImpl("rk", 7, longType),
       outputRankNumber = true,
-      RankProcessStrategy.UNDEFINED_STRATEGY
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowtime = false
     )
 
     (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, 
streamRank)
@@ -753,7 +754,8 @@ class FlinkRelMdHandlerTestBase {
       new ConstantRankRange(3, 5),
       new RelDataTypeFieldImpl("rk", 7, longType),
       outputRankNumber = true,
-      RankProcessStrategy.UNDEFINED_STRATEGY
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowtime = false
     )
 
     (logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, 
streamRank)
@@ -805,7 +807,8 @@ class FlinkRelMdHandlerTestBase {
       new ConstantRankRange(3, 6),
       new RelDataTypeFieldImpl("rn", 7, longType),
       outputRankNumber = true,
-      RankProcessStrategy.UNDEFINED_STRATEGY
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowtime = false
     )
 
     (logicalRowNumber, flinkLogicalRowNumber, streamRowNumber)
@@ -962,7 +965,8 @@ class FlinkRelMdHandlerTestBase {
       new VariableRankRange(3),
       new RelDataTypeFieldImpl("rk", 7, longType),
       outputRankNumber = true,
-      RankProcessStrategy.UNDEFINED_STRATEGY
+      RankProcessStrategy.UNDEFINED_STRATEGY,
+      sortOnRowtime = false
     )
 
     (logicalRankWithVariableRange, flinkLogicalRankWithVariableRange, 
streamRankWithVariableRange)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
index f0fe449386a..2bab70f2d68 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala
@@ -71,6 +71,32 @@ class DeduplicateTest extends TableTestBase {
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testInvalidChangelogInput(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create temporary table cdc (
+                               | a int,
+                               | b bigint,
+                               | ts timestamp_ltz(3),
+                               | primary key (a) not enforced,
+                               | watermark for ts as ts - interval '5' second
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB,D'
+                               |)""".stripMargin)
+    val sql =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY ts DESC) as 
rank_num
+        |  FROM cdc)
+        |WHERE rank_num = 1
+      """.stripMargin
+
+    // the input is not append-only, it will not be translate to LastRow, but 
Rank
+    util.verifyExecPlan(sql)
+  }
+
   @Test
   def testLastRowWithWindowOnRowtime(): Unit = {
     util.tableEnv.getConfig
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
index e16f747dd39..fb43bd50c98 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
@@ -1625,68 +1625,80 @@ class 
NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
 
   @TestTemplate
   def testProctimeDedupOnCdcWithMetadataSinkWithPk(): Unit = {
-    // TODO this should be updated after StreamPhysicalDeduplicate supports 
consuming update
-    assertThatThrownBy(
-      () =>
-        util.verifyExecPlanInsert(
-          """
-            |insert into sink_with_pk
-            |SELECT a, metadata_3, c
-            |FROM (
-            |  SELECT *,
-            |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
-            |  FROM cdc_with_meta
-            |)
-            |WHERE rowNum = 1
-      """.stripMargin))
-      .hasMessageContaining(
-        "StreamPhysicalDeduplicate doesn't support consuming update and delete 
changes")
-      .isInstanceOf[TableException]
+    // now deduplicate query with updating will translate to retract rank
+    val callable: ThrowingCallable = () =>
+      util.verifyExecPlanInsert(
+        """
+          |insert into sink_with_pk
+          |SELECT a, metadata_3, c
+          |FROM (
+          |  SELECT *,
+          |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+          |  FROM cdc_with_meta
+          |)
+          |WHERE rowNum = 1
+      """.stripMargin)
+
+    if (tryResolve) {
+      assertThatThrownBy(callable)
+        .hasMessageContaining(
+          "The column(s): $7(generated by non-deterministic function: PROCTIME 
) can not satisfy the determinism requirement")
+        .isInstanceOf[TableException]
+    } else {
+      assertThatCode(callable).doesNotThrowAnyException()
+    }
   }
 
   @TestTemplate
   def testProctimeDedupOnCdcWithMetadataSinkWithoutPk(): Unit = {
-    // TODO this should be updated after StreamPhysicalDeduplicate supports 
consuming update
-    assertThatThrownBy(
-      () =>
-        util.verifyExecPlanInsert(
-          """
-            |insert into sink_without_pk
-            |SELECT a, metadata_3, c
-            |FROM (
-            |  SELECT *,
-            |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
-            |  FROM cdc_with_meta
-            |)
-            |WHERE rowNum = 1
-      """.stripMargin
-        ))
-      .hasMessageContaining(
-        "StreamPhysicalDeduplicate doesn't support consuming update and delete 
changes")
-      .isInstanceOf[TableException]
+    // now deduplicate query with updating will translate to retract rank
+    val callable: ThrowingCallable = () =>
+      util.verifyExecPlanInsert(
+        """
+          |insert into sink_without_pk
+          |SELECT a, metadata_3, c
+          |FROM (
+          |  SELECT *,
+          |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+          |  FROM cdc_with_meta
+          |)
+          |WHERE rowNum = 1
+      """.stripMargin)
 
+    if (tryResolve) {
+      assertThatThrownBy(callable)
+        .hasMessageContaining(
+          "The column(s): $7(generated by non-deterministic function: PROCTIME 
) can not satisfy the determinism requirement")
+        .isInstanceOf[TableException]
+    } else {
+      assertThatCode(callable).doesNotThrowAnyException()
+    }
   }
 
   @TestTemplate
   def testRowtimeDedupOnCdcWithMetadataSinkWithPk(): Unit = {
-    // TODO this should be updated after StreamPhysicalDeduplicate supports 
consuming update
-    assertThatThrownBy(
-      () =>
-        util.verifyExecPlanInsert(
-          """
-            |insert into sink_with_pk
-            |SELECT a, b, c
-            |FROM (
-            |  SELECT *,
-            |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as 
rowNum
-            |  FROM cdc_with_meta_and_wm
-            |)
-            |WHERE rowNum = 1
-      """.stripMargin
-        ))
-      .hasMessageContaining(
-        "StreamPhysicalDeduplicate doesn't support consuming update and delete 
changes")
-      .isInstanceOf[TableException]
+    // now deduplicate query with updating will translate to retract rank
+    val callable: ThrowingCallable = () =>
+      util.verifyExecPlanInsert(
+        """
+          |insert into sink_with_pk
+          |SELECT a, b, c
+          |FROM (
+          |  SELECT *,
+          |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum
+          |  FROM cdc_with_meta_and_wm
+          |)
+          |WHERE rowNum = 1
+      """.stripMargin)
+
+    if (tryResolve) {
+      assertThatThrownBy(callable)
+        .hasMessageContaining(
+          "The metadata column(s): 'op_ts' in cdc source may cause wrong 
result or error on downstream operators")
+        .isInstanceOf[TableException]
+    } else {
+      assertThatCode(callable).doesNotThrowAnyException()
+    }
   }
 
   @TestTemplate
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 8cfec66a3cb..6ffe892ba85 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -842,14 +842,15 @@ class RankTest extends TableTestBase {
                                |CREATE VIEW v1 AS
                                |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d 
FROM v0 GROUP BY c, b
                                |""".stripMargin)
-    util.verifyRelPlan("""
-                         |SELECT c, b, d
-                         |FROM (
-                         |    SELECT
-                         |       c, b, d,
-                         |       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY 
d DESC) AS rn FROM v1
-                         |) WHERE rn < 10
-                         |""".stripMargin)
+    util.verifyExecPlan(
+      """
+        |SELECT c, b, d
+        |FROM (
+        |    SELECT
+        |       c, b, d,
+        |       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn 
FROM v1
+        |) WHERE rn < 10
+        |""".stripMargin)
   }
   @Test
   def testUpdatableRankAfterLookupJoin(): Unit = {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 2a1270f08c2..c16a413aff4 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -1684,7 +1684,7 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
 
   @TestTemplate
   def testProctimeWindowTVFWithDedupWhenCantMerge(): Unit = {
-    util.verifyRelPlan(
+    util.verifyExecPlan(
       """
         |select c, count(a)
         |from (

Reply via email to