[
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jingsong Lee closed FLINK-24704.
--------------------------------
Resolution: Fixed
master: 33aaabebc63f7a44fb64930f5daf8006ce030752
release-1.14: d31cfb9b1b206270f5ccdb456c84d8c639e0524d
> Exception occurs when the input record loses monotonicity on the sort key
> field of UpdatableTopNFunction
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.14.0
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> An IllegalArgumentException occurred when the input retract record's sort key
> is lower than old sort key, this's because it breaks the monotonicity on sort
> key field which is guaranteed by the sql semantic. It's highly possible
> upstream stateful operator has shorter state ttl than the stream records is
> that cause the staled record cleared by state ttl.
> A reproduce case like below:
> {code:java|title=RankHarnessTest.java|borderStyle=solid}
> }}
> val sql =
> """
> |SELECT word, cnt, rank_num
> |FROM (
> | SELECT word, cnt,
> | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
> | FROM (
> | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by
> word, type
> | )
> | )
> |WHERE rank_num <= 6
> """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then
> downstream retract rank operator will fail on such exception:
>
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue
> processing(but incorrectly result) by default or can be configured to
> failover after
> [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)