[
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17436583#comment-17436583
]
Kurt Young commented on FLINK-24704:
------------------------------------
Thanks for reporting this [~lincoln.86xy], will this leads to incorrect result
or endless exception-failover-exception loop?
> Exception occurs when the input record loses monotonicity on the sort key
> field of UpdatableTopNFunction
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.14.0
> Reporter: lincoln lee
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.15.0
>
>
> An IllegalArgumentException occurred when the input retract record's sort key
> is lower than old sort key, this's because it breaks the monotonicity on sort
> key field which is guaranteed by the sql semantic. It's highly possible
> upstream stateful operator has shorter state ttl than the stream records is
> that cause the staled record cleared by state ttl.
> A reproduce case like below:
> {{{code:title=RankHarnessTest.java|borderStyle=solid}}}
> val sql =
> """
> |SELECT word, cnt, rank_num
> |FROM (
> | SELECT word, cnt,
> | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
> | FROM (
> | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by
> word, type
> | )
> | )
> |WHERE rank_num <= 6
> """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then
> downstream retract rank operator will fail on such exception:
>
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue
> processing(but incorrectly result) by default or can be configured to
> failover after Flink-24666 was addressed.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)