[
https://issues.apache.org/jira/browse/FLINK-26408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500024#comment-17500024
]
Hengyu Dai commented on FLINK-26408:
------------------------------------
[~lzljs3620320] thanks for your reply!
I think there is a little difference between FLINK-24654 and this case.
FLINK-24654 handles inconsistencies of dataState and treeMap, but in the above
example, no matter whether the two state are consistent, they will all throw
"Can not retract a non-existent record. This should never happen" Exception.
I have studied the commits of FLINK-24654, the new method
{{processStateStaled}} will not be invoked in my example.
{code:java}
// code in retractRecordWithoutRowNumber
if (!findsSortKey && key.equals(sortKey)) {
List<RowData> inputs = dataState.get(key);
if (inputs == null) {
processStateStaled(iterator);
} else {
// ...
}
} {code}
{{key.equals(sortKey) }}this statement is not met because current sort key is 3
for -(a1,b1,3,jk1) , meantime there is only one element in sortMap and its key
is 5 as +(a1,b2,5,jk2) arrives first.
> retract a non-existent record in RetractableTopNFunction
> ---------------------------------------------------------
>
> Key: FLINK-26408
> URL: https://issues.apache.org/jira/browse/FLINK-26408
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner, Table SQL / Runtime
> Affects Versions: 1.12.2
> Reporter: Hengyu Dai
> Priority: Blocker
>
> RetractableTopNFunction will throw a RuntimeException when
> # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>>
> treeMap{color} is not empty.
> # and the sorted Map doesn't contain current sort key.
> Now we have Flink SQL job:
> {code:java}
> // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)
> select
> a_key,a_time,a_jk,b_key,b_time,b_jk
> from
> (
> select
> a_key,a_time,a_jk,b_key,b_time,b_jk,
> row_number() over(partition by a_key order by a_time desc) as rn
> from
> (
> select a_key, a_time, a_jk
> from (
> select * , row_number() over(partition by a_key order by a_time
> desc) as rn
> from table_a
> ) tmp1
> where rn = 1
> ) t1
> left join
> (
> select b_key, b_time, b_jk
> from (
> select * , row_number() over(partition by b_key order by b_time
> desc) as rn
> from table_b
> ) tmp2
> where rn = 1
> ) t2
> on t1.a_jk = t2.b_jk
> ) t3
> where rn = 1{code}
> the JobGraph is like:
> {{Source table_a —> Rank_a}}
> {{—> Join —>
> Final Rank }}
> {{Source table_b —> Rank_b}}
> Suppose we hava following input:
> ||ts||SourceA
> (a_key, a_time,a_jk)||SourceB
> (b_key,b_time,b_jk)||RankA
> (a_key, a_time,a_jk)||RankB
> (b_key,b_time,b_jk)||Join
> (a_key,b_key,a_time, a_jk)||Final Rank
> (a_key,b_key,a_time)||
> |t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | |
> |t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | |
> |t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)|
> |t4|+(a1,4,jk1)| |-(a1,3,jk1)
> +(a1,4,jk1)| |-(a1,b1,3,jk1)
> +(a1,b1,4,jk1)|-(a1,b1,3)
> +(a1,b1,4)|
> |t5|+(a1,5,jk2)| |-(a1,4,jk1)
> +(a1,5,jk2)| |-(a1,b1,4,jk1)
> +(a1,b2,5,jk2)|-(a1,b1,4)
> +(a1,b2,5)|
> | | | | | | | |
>
> Assume:
> # t4&t5 is almost at the same time, the Join Operator produce 4 message at
> t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) (which hashed
> with jk2) may runs on different task from other 3 messages(hashed with jk1),
> and it may arrive Final Rank earlier than them.
> # Due to network congestion or high machine load, etc. the messages produced
> at t4&t5 on Join Operator take a while before they arrive Final Rank, when
> Final Rank received them, the state is expired because of State TTL, the
> treeMap state is cleared.
> Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key
> a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it
> will find that the sortedMap is not empty, and it doesn't contains sort key
> value 3. meet the conditions for that Runtime Exception.
> we met this exception in our production environment (Flink verision 1.12.2),
> it's very serious because when it happens, the job can not recover
> automatically as the state is polluted.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)