[
https://issues.apache.org/jira/browse/FLINK-20272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17238065#comment-17238065
]
Jark Wu edited comment on FLINK-20272 at 11/24/20, 11:36 AM:
-------------------------------------------------------------
Fixed in master (1.12.0): 2fd73c140e58bf9a38bf583d06b32843d299a78a
was (Author: jark):
Fixed in master (1.12.0): FLINK-20272
> Fix wrong result for TopN when removing records out of TopN
> -----------------------------------------------------------
>
> Key: FLINK-20272
> URL: https://issues.apache.org/jira/browse/FLINK-20272
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.0, 1.12.0
> Reporter: Shengkai Fang
> Assignee: Jark Wu
> Priority: Major
> Fix For: 1.12.0
>
>
> Add the following test in the {{RetractableTopNFunctionTest}}.
> {code:java}
> @Test
> public void testCornerCase2() throws Exception {
> AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new
> ConstantRankRange(1, 2), false,
> false);
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
> createTestHarness(func);
> testHarness.open();
> testHarness.processElement(insertRecord("a", 1L, 1));
> testHarness.processElement(insertRecord("a", 2L, 2));
> testHarness.processElement(insertRecord("a", 3L, 2));
> testHarness.processElement(insertRecord("a", 4L, 4));
> testHarness.processElement(insertRecord("a", 5L, 4));
> testHarness.processElement(deleteRecord("a", 4L, 4));
> testHarness.processElement(deleteRecord("a", 1L, 1));
> testHarness.processElement(deleteRecord("a", 2L, 2));
> testHarness.close();
> List<Object> expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord("a", 1L, 1));
> expectedOutput.add(insertRecord("a", 2L, 2));
> expectedOutput.add(deleteRecord("a", 1L, 1));
> expectedOutput.add(insertRecord("a", 3L, 2));
> expectedOutput.add(deleteRecord("a", 2L, 2));
> expectedOutput.add(insertRecord("a", 5L, 4));
> assertorWithRowNumber.assertOutputEquals("output wrong.",
> expectedOutput, testHarness.getOutput());
> }
> {code}
> When the operator gets delete message, it will only delete the record whose
> current rank is in the range. If we keep deleting the message in the range,
> the operator will send the undeleted message to the sink.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)