Shengkai Fang created FLINK-20272:
-------------------------------------
Summary: Wrong result of Rank Function
Key: FLINK-20272
URL: https://issues.apache.org/jira/browse/FLINK-20272
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.11.0, 1.12.0
Reporter: Shengkai Fang
Add the following test in the {{RetractableTopNFunctionTest}}.
{code:java}
@Test
public void testCornerCase2() throws Exception {
AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new
ConstantRankRange(1, 2), false,
false);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
createTestHarness(func);
testHarness.open();
testHarness.processElement(insertRecord("a", 1L, 1));
testHarness.processElement(insertRecord("a", 2L, 2));
testHarness.processElement(insertRecord("a", 3L, 2));
testHarness.processElement(insertRecord("a", 4L, 4));
testHarness.processElement(insertRecord("a", 5L, 4));
testHarness.processElement(deleteRecord("a", 4L, 4));
testHarness.processElement(deleteRecord("a", 1L, 1));
testHarness.processElement(deleteRecord("a", 2L, 2));
testHarness.close();
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(insertRecord("a", 1L, 1));
expectedOutput.add(insertRecord("a", 2L, 2));
expectedOutput.add(deleteRecord("a", 1L, 1));
expectedOutput.add(insertRecord("a", 3L, 2));
expectedOutput.add(deleteRecord("a", 2L, 2));
expectedOutput.add(insertRecord("a", 5L, 4));
assertorWithRowNumber.assertOutputEquals("output wrong.",
expectedOutput, testHarness.getOutput());
}
{code}
When the operator gets delete message, it will only delete the record whose
current rank is in the range. If we keep deleting the message in the range, the
operator will send the undeleted message to the sink.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)