[ 
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-24704:
-------------------------------
    Fix Version/s: 1.14.1

> Exception occurs when the input record loses monotonicity on the sort key 
> field of UpdatableTopNFunction
> --------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-24704
>                 URL: https://issues.apache.org/jira/browse/FLINK-24704
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.0
>            Reporter: lincoln lee
>            Assignee: lincoln lee
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.14.1
>
>
> An IllegalArgumentException occurred when the input retract record's sort key 
> is lower than old sort key, this's because it breaks the monotonicity on sort 
> key field which is guaranteed by the sql semantic. It's highly possible 
> upstream stateful operator has shorter state ttl than the stream records is 
> that cause the staled record cleared by state ttl. 
> A reproduce case like below:
> {code:java|title=RankHarnessTest.java|borderStyle=solid}
> }}
> val sql =
>  """
>  |SELECT word, cnt, rank_num
>  |FROM (
>  | SELECT word, cnt,
>  | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
>  | FROM (
>  | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by 
> word, type
>  | )
>  | )
>  |WHERE rank_num <= 6
>  """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then 
> downstream retract rank operator will fail on such exception:
>  
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException 
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) 
> at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
>  at 
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
>  at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>  at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue 
> processing(but incorrectly result) by default or can be configured to 
> failover after 
> [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to