[
https://issues.apache.org/jira/browse/FLINK-24704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lincoln lee updated FLINK-24704:
--------------------------------
Description:
An IllegalArgumentException occurred when the input retract record's sort key
is lower than old sort key, this's because it breaks the monotonicity on sort
key field which is guaranteed by the sql semantic. It's highly possible
upstream stateful operator has shorter state ttl than the stream records is
that cause the staled record cleared by state ttl.
A reproduce case like below:
{code:java|title=RankHarnessTest.java|borderStyle=solid}
}}
val sql =
"""
|SELECT word, cnt, rank_num
|FROM (
| SELECT word, cnt,
| ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
| FROM (
| select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word,
type
| )
| )
|WHERE rank_num <= 6
""".stripMargin
{code}
when then aggregated result column `cnt` becomes lower for a key, then
downstream retract rank operator will fail on such exception:
{quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
{quote}
Here we should align with the RetractableTopNFunction, continue processing(but
incorrectly result) by default or can be configured to failover after
[Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
was:
An IllegalArgumentException occurred when the input retract record's sort key
is lower than old sort key, this's because it breaks the monotonicity on sort
key field which is guaranteed by the sql semantic. It's highly possible
upstream stateful operator has shorter state ttl than the stream records is
that cause the staled record cleared by state ttl.
A reproduce case like below:
{{{code:title=RankHarnessTest.java|borderStyle=solid}}}
val sql =
"""
|SELECT word, cnt, rank_num
|FROM (
| SELECT word, cnt,
| ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
| FROM (
| select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word,
type
| )
| )
|WHERE rank_num <= 6
""".stripMargin
{code}
when then aggregated result column `cnt` becomes lower for a key, then
downstream retract rank operator will fail on such exception:
{quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122) at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
at
org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
{quote}
Here we should align with the RetractableTopNFunction, continue processing(but
incorrectly result) by default or can be configured to failover after
Flink-24666 was addressed.
> Exception occurs when the input record loses monotonicity on the sort key
> field of UpdatableTopNFunction
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24704
> URL: https://issues.apache.org/jira/browse/FLINK-24704
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.14.0
> Reporter: lincoln lee
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.15.0
>
>
> An IllegalArgumentException occurred when the input retract record's sort key
> is lower than old sort key, this's because it breaks the monotonicity on sort
> key field which is guaranteed by the sql semantic. It's highly possible
> upstream stateful operator has shorter state ttl than the stream records is
> that cause the staled record cleared by state ttl.
> A reproduce case like below:
> {code:java|title=RankHarnessTest.java|borderStyle=solid}
> }}
> val sql =
> """
> |SELECT word, cnt, rank_num
> |FROM (
> | SELECT word, cnt,
> | ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
> | FROM (
> | select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by
> word, type
> | )
> | )
> |WHERE rank_num <= 6
> """.stripMargin
> {code}
> when then aggregated result column `cnt` becomes lower for a key, then
> downstream retract rank operator will fail on such exception:
>
> {quote}java.lang.IllegalArgumentExceptionjava.lang.IllegalArgumentException
> at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.emitRecordsWithRowNumber(UpdatableTopNFunction.java:399)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElementWithRowNumber(UpdatableTopNFunction.java:274)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:167)
> at
> org.apache.flink.table.runtime.operators.rank.UpdatableTopNFunction.processElement(UpdatableTopNFunction.java:69)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:209)
> {quote}
> Here we should align with the RetractableTopNFunction, continue
> processing(but incorrectly result) by default or can be configured to
> failover after
> [Flink-24666|https://issues.apache.org/jira/browse/FLINK-24666] was addressed.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)