[
https://issues.apache.org/jira/browse/FLINK-28019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jingsong Lee reassigned FLINK-28019:
------------------------------------
Assignee: lincoln lee
> Error occurred when retract a staled record when enable state ttl in
> RetractableTopNFunction
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-28019
> URL: https://issues.apache.org/jira/browse/FLINK-28019
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.15.0, 1.14.4
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0
>
>
> We found an error occurred when retract a staled record when enable state ttl
> in RetractableTopNFunction, a reproduce case:
> {code}
> @Test
> public void testRetractAnStaledRecordWithRowNumber() throws Exception {
> StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1_000);
> AbstractTopNFunction func =
> new RetractableTopNFunction(
> ttlConfig,
> InternalTypeInfo.ofFields(
> VarCharType.STRING_TYPE, new BigIntType(),
> new IntType()),
> comparableRecordComparator,
> sortKeySelector,
> RankType.ROW_NUMBER,
> new ConstantRankRange(1, 2),
> generatedEqualiser,
> true,
> true);
> OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
> createTestHarness(func);
> testHarness.open();
> testHarness.setStateTtlProcessingTime(0);
> testHarness.processElement(insertRecord("a", 1L, 10));
> testHarness.setStateTtlProcessingTime(1001);
> testHarness.processElement(insertRecord("a", 2L, 11));
> testHarness.processElement(deleteRecord("a", 1L, 10));
> testHarness.close();
> List<Object> expectedOutput = new ArrayList<>();
> expectedOutput.add(insertRecord("a", 1L, 10, 1L));
> expectedOutput.add(insertRecord("a", 2L, 11, 1L));
> // the following delete record should not be sent because the left
> row is null which is
> // illegal.
> // -D{row1=null, row2=+I(1)};
> assertorWithRowNumber.assertOutputEquals(
> "output wrong.", expectedOutput, testHarness.getOutput());
> }
> {code}
> the reason is the uncomplete path when deal with staled records.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)