xuyangzhong commented on code in PR #25717: URL: https://github.com/apache/flink/pull/25717#discussion_r1909891982
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala: ########## @@ -451,4 +467,128 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) { assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) testHarness.close() } + + def prepareTop1Tester(query: String, operatorNameIdentifier: String) + : (KeyedOneInputStreamOperatorTestHarness[RowData, RowData, RowData], RowDataHarnessAssertor) = { + val sourceDDL = + s""" + |CREATE TEMPORARY TABLE T( + | a STRING PRIMARY KEY NOT ENFORCED, + | b BIGINT + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'I' + |) + |""".stripMargin + tEnv.executeSql(sourceDDL) + + val t1 = tEnv.sqlQuery(query) + + val testHarness = + createHarnessTester(t1.toRetractStream[Row], operatorNameIdentifier) + val assertor = new RowDataHarnessAssertor( + Array( + DataTypes.STRING().getLogicalType, + DataTypes.BIGINT().getLogicalType, + DataTypes.BIGINT().getLogicalType)) + + (testHarness, assertor) + } + + @TestTemplate + def testAppendFastTop1(): Unit = { + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) + val query = + """ + |SELECT a, b, rn + |FROM + |( + | SELECT a, b, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn + | FROM T + |) t1 + |WHERE rn <= 1 + """.stripMargin + val (testHarness, assertor) = + prepareTop1Tester(query, "Rank(strategy=[AppendFastStrategy") + + if (enableAsyncState) { + assertThat(isAsyncStateOperator(testHarness)).isTrue + } else { + assertThat(isAsyncStateOperator(testHarness)).isFalse + } + + testHarness.open() + + testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "a", 1L: JLong)) + testHarness.processElement(binaryRecord(INSERT, "a", 3L: JLong)) + + val result = dropWatermarks(testHarness.getOutput.toArray) + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong)) + + assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result) + + testHarness.close() + } + + @TestTemplate + def testUpdateFastTop1(): Unit = { + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) + val query = + """ + |SELECT a, b, rn + |FROM + |( + | SELECT a, b, + | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn + | FROM ( + | select a, count(*) as b from T group by a + | ) t1 + |) t2 + |WHERE rn <= 1 + """.stripMargin + val (testHarness, assertor) = + prepareTop1Tester(query, "Rank(strategy=[UpdateFastStrategy") + + if (enableAsyncState) { + assertThat(isAsyncStateOperator(testHarness)).isTrue + } else { + assertThat(isAsyncStateOperator(testHarness)).isFalse + } + + testHarness.open() + + testHarness.processElement(binaryRecord(INSERT, "a", 2L: JLong)) + testHarness.processElement(binaryRecord(UPDATE_AFTER, "a", 3L: JLong)) + testHarness.processElement(binaryRecord(UPDATE_AFTER, "a", 4L: JLong)) + + val result = dropWatermarks(testHarness.getOutput.toArray) + + val expectedOutput = new ConcurrentLinkedQueue[Object]() + expectedOutput.add(binaryRecord(INSERT, "a", 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 2L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_BEFORE, "a", 3L: JLong)) + expectedOutput.add(binaryRecord(UPDATE_AFTER, "a", 4L: JLong)) + Review Comment: All input data is sent directly to Rank, not to Agg. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org