jnh5y commented on code in PR #25372:
URL: https://github.com/apache/flink/pull/25372#discussion_r1771333090


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.scala:
##########
@@ -272,4 +280,163 @@ class DeduplicateTest extends TableTestBase {
     util.verifyExecPlan(sqlQuery)
   }
 
+  @Test
+  def testRankConsumeChangelogGroupAggregate(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT a, COUNT(b) as b FROM MyTable GROUP BY a)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogDistinct(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT DISTINCT a, b FROM MyTable)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogLeftOuterJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT m.a, m.b FROM MyTable m LEFT OUTER JOIN MyTable2 m2 ON 
m.a = m2.a)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogRightOuterJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT m.a, m.b FROM MyTable m RIGHT OUTER JOIN MyTable2 m2 
ON m.a = m2.a)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogFullOuterJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT m.a, m.b FROM MyTable m FULL OUTER JOIN MyTable2 m2 ON 
m.a = m2.a)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogUnionGroupAggregate(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT DISTINCT a, b FROM MyTable
+        |        UNION ALL
+        |        SELECT count(a), b FROM MyTable GROUP BY b)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogIntersectSemiJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT DISTINCT a, b FROM MyTable
+        |        INTERSECT
+        |        SELECT a, b FROM MyTable2)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogExceptAntiJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT DISTINCT a, b FROM MyTable
+        |        EXCEPT
+        |        SELECT a, b FROM MyTable2)
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogExistsAntiJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT a, b FROM MyTable
+        |        WHERE NOT EXISTS (
+        |        SELECT a, b FROM MyTable2))
+        |)
+        |WHERE rowNum = 1
+      """.stripMargin
+    util.verifyExecPlan(sqlQuery)
+  }
+
+  @Test
+  def testRankConsumeChangelogExistsSemiJoin(): Unit = {
+    val sqlQuery =
+      """
+        |SELECT *
+        |FROM (
+        |  SELECT *,
+        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
rowNum
+        |  FROM (SELECT a, b FROM MyTable

Review Comment:
   In all of these test cases, could the target of the `FROM` be extracted as a 
CTE?  (That may make things too abstract.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to