[ 
https://issues.apache.org/jira/browse/FLINK-20272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-20272.
---------------------------
    Fix Version/s: 1.12.0
       Resolution: Fixed

Fixed in master (1.12.0): FLINK-20272

> Fix wrong result for TopN when removing records out of TopN
> -----------------------------------------------------------
>
>                 Key: FLINK-20272
>                 URL: https://issues.apache.org/jira/browse/FLINK-20272
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: Shengkai Fang
>            Assignee: Jark Wu
>            Priority: Major
>             Fix For: 1.12.0
>
>
> Add the following test in the {{RetractableTopNFunctionTest}}.
> {code:java}
> @Test
> public void testCornerCase2() throws Exception {
>       AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new 
> ConstantRankRange(1, 2), false,
>                               false);
>       OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = 
> createTestHarness(func);
>       testHarness.open();
>       testHarness.processElement(insertRecord("a", 1L, 1));
>       testHarness.processElement(insertRecord("a", 2L, 2));
>       testHarness.processElement(insertRecord("a", 3L, 2));
>       testHarness.processElement(insertRecord("a", 4L, 4));
>       testHarness.processElement(insertRecord("a", 5L, 4));
>       testHarness.processElement(deleteRecord("a", 4L, 4));
>       testHarness.processElement(deleteRecord("a", 1L, 1));
>       testHarness.processElement(deleteRecord("a", 2L, 2));
>       testHarness.close();
>       List<Object> expectedOutput = new ArrayList<>();
>       expectedOutput.add(insertRecord("a", 1L, 1));
>       expectedOutput.add(insertRecord("a", 2L, 2));
>       expectedOutput.add(deleteRecord("a", 1L, 1));
>       expectedOutput.add(insertRecord("a", 3L, 2));
>       expectedOutput.add(deleteRecord("a", 2L, 2));
>       expectedOutput.add(insertRecord("a", 5L, 4));
>       assertorWithRowNumber.assertOutputEquals("output wrong.", 
> expectedOutput, testHarness.getOutput());
> }
> {code}
> When the operator gets delete message, it will only delete the record whose 
> current rank is in the range. If we keep deleting the message in the range, 
> the operator will send the undeleted message to the sink.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to