[
https://issues.apache.org/jira/browse/FLINK-20272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu closed FLINK-20272.
---------------------------
Fix Version/s: 1.12.0
Resolution: Fixed
Fixed in master (1.12.0): FLINK-20272
> Fix wrong result for TopN when removing records out of TopN
> -----------------------------------------------------------
>
> Key: FLINK-20272
> URL: https://issues.apache.org/jira/browse/FLINK-20272
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.0, 1.12.0
> Reporter: Shengkai Fang
> Assignee: Jark Wu
> Priority: Major
> Fix For: 1.12.0
>
>
> Add the following test in the {{RetractableTopNFunctionTest}}.
> {code:java}
> @Test
> public void testCornerCase2() throws Exception {
> AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new
> ConstantRankRange(1, 2), false,
> false);
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
> createTestHarness(func);
> testHarness.open();
> testHarness.processElement(insertRecord("a", 1L, 1));
> testHarness.processElement(insertRecord("a", 2L, 2));
> testHarness.processElement(insertRecord("a", 3L, 2));
> testHarness.processElement(insertRecord("a", 4L, 4));
> testHarness.processElement(insertRecord("a", 5L, 4));
> testHarness.processElement(deleteRecord("a", 4L, 4));
> testHarness.processElement(deleteRecord("a", 1L, 1));
> testHarness.processElement(deleteRecord("a", 2L, 2));
> testHarness.close();
> List<Object> expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord("a", 1L, 1));
> expectedOutput.add(insertRecord("a", 2L, 2));
> expectedOutput.add(deleteRecord("a", 1L, 1));
> expectedOutput.add(insertRecord("a", 3L, 2));
> expectedOutput.add(deleteRecord("a", 2L, 2));
> expectedOutput.add(insertRecord("a", 5L, 4));
> assertorWithRowNumber.assertOutputEquals("output wrong.",
> expectedOutput, testHarness.getOutput());
> }
> {code}
> When the operator gets delete message, it will only delete the record whose
> current rank is in the range. If we keep deleting the message in the range,
> the operator will send the undeleted message to the sink.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)