xuyangzhong commented on code in PR #25717:
URL: https://github.com/apache/flink/pull/25717#discussion_r1909886420


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala:
##########
@@ -451,4 +467,128 @@ class RankHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
     testHarness.close()
   }
+
+  def prepareTop1Tester(query: String, operatorNameIdentifier: String)
+      : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], 
RowDataHarnessAssertor) = {
+    val sourceDDL =
+      s"""
+         |CREATE TEMPORARY TABLE T(
+         |  a STRING PRIMARY KEY NOT ENFORCED,
+         |  b BIGINT
+         |) WITH (
+         |  'connector' = 'values',
+         |  'changelog-mode' = 'I'
+         |)
+         |""".stripMargin
+    tEnv.executeSql(sourceDDL)
+
+    val t1 = tEnv.sqlQuery(query)
+
+    val testHarness =
+      createHarnessTester(t1.toRetractStream[Row], operatorNameIdentifier)
+    val assertor = new RowDataHarnessAssertor(
+      Array(
+        DataTypes.STRING().getLogicalType,
+        DataTypes.BIGINT().getLogicalType,
+        DataTypes.BIGINT().getLogicalType))
+
+    (testHarness, assertor)
+  }
+
+  @TestTemplate
+  def testAppendFastTop1(): Unit = {
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+    val query =
+      """
+        |SELECT a, b, rn
+        |FROM
+        |(
+        |    SELECT a, b,
+        |        ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn
+        |    FROM T
+        |) t1
+        |WHERE rn <= 1
+      """.stripMargin
+    val (testHarness, assertor) =
+      prepareTop1Tester(query, "Rank(strategy=[AppendFastStrategy")
+
+    if (enableAsyncState) {
+      assertThat(isAsyncStateOperator(testHarness)).isTrue
+    } else {
+      assertThat(isAsyncStateOperator(testHarness)).isFalse
+    }
+
+    testHarness.open()
+
+    testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "a", 1L: JLong))
+    testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong))
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+    expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong))
+
+    assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, 
result)
+
+    testHarness.close()
+  }
+
+  @TestTemplate
+  def testUpdateFastTop1(): Unit = {
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+    val query =
+      """
+        |SELECT a, b, rn
+        |FROM
+        |(
+        |    SELECT a, b,
+        |        ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn
+        |    FROM (
+        |       select a, count(*) as b from T group by a
+        |    ) t1
+        |) t2
+        |WHERE rn <= 1
+      """.stripMargin
+    val (testHarness, assertor) =
+      prepareTop1Tester(query, "Rank(strategy=[UpdateFastStrategy")
+
+    if (enableAsyncState) {
+      assertThat(isAsyncStateOperator(testHarness)).isTrue
+    } else {
+      assertThat(isAsyncStateOperator(testHarness)).isFalse
+    }
+
+    testHarness.open()
+
+    testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong))
+    testHarness.processElement(binaryRecord(UPDATE_AFTER, "a", 3L: JLong))
+    testHarness.processElement(binaryRecord(UPDATE_AFTER, "a", 4L: JLong))
+
+    val result = dropWatermarks(testHarness.getOutput.toArray)
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+    expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 3L: JLong))
+    expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 4L: JLong))
+

Review Comment:
   It's a changelog output and the final result is `"a", 4L: JLong`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to