Hengyu Dai created FLINK-26408:
----------------------------------
Summary: retract a non-existent record in RetractableTopNFunction
Key: FLINK-26408
URL: https://issues.apache.org/jira/browse/FLINK-26408
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Hengyu Dai
RetractableTopNFunction will throw a RuntimeException when
# the sorted Map {{ValueState<SortedMap<RowData, Long>> treeMap }}is not empty.
# and the sorted Map doesn't contains current sort key.
Now we have Flink SQL job:
{code:java}
// table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)
select
a_key,a_time,a_jk,b_key,b_time,b_jk
from
(
select
a_key,a_time,a_jk,b_key,b_time,b_jk,
row_number() over(partition by a_key order by a_time desc) as rn
from
(
select a_key, a_time, a_jk
from (
select * , row_number() over(partition by a_key order by a_time
desc) as rn
from table_a
) tmp1
where rn = 1
) t1
left join
(
select b_key, b_time, b_jk
from (
select * , row_number() over(partition by b_key order by b_time
desc) as rn
from table_b
) tmp2
where rn = 1
) t2
on t1.a_jk = t2.b_jk
) t3
where rn = 1{code}
the JobGraph is like:
{{Source table_a —> Rank_a}}
{{—> Join —>
Final Rank }}
{{Source table_b —> Rank_b}}
Suppose we hava following input:
#
||ts||SourceA
(a_key, a_time,a_jk)||SourceB
(b_key,b_time,b_jk)||RankA
(a_key, a_time,a_jk)||RankB
(b_key,b_time,b_jk)||Join
(a_key,b_key,a_time, a_jk)||Final Rank
(a_key,b_key,a_time)||
|t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | |
|t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | |
|t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)|
|t4|+(a1,4,jk1)| |-(a1,3,jk1)
+(a1,4,jk1)| |-(a1,b1,3,jk1)
+(a1,b1,4,jk1)|-(a1,b1,3)
+(a1,b1,4)|
|t5|+(a1,5,jk2)| |-(a1,4,jk1)
+(a1,5,jk2)| |-(a1,b1,4,jk1)
+(a1,b2,5,jk2)|-(a1,b1,4)
+(a1,b2,5)|
| | | | | | | |
Assume:
# t4&t5 is almost at the same time, the Join Operator produce 4 message at
t4&t5, as the Hash Key changed(jk1 -> jk2), three of them {{-(a1,b1,3,jk1)
+(a1,b1,4,jk1) -(a1,b1,4,jk1) }}runs on task1, and the last one
{{+(a1,b2,5,jk2) }}maybe runs on another task2.
# Due to network congestion or high machine load, etc. the messages produced
by t4&t5 take a while before they arrive Final Rank, when Final Rank received
them, the state is expired because of State TTL.
Now if {{+(a1,b2,5,jk2) }}arrives Final Rank first, the sortedMap of partition
key a1 will put a sort value 5. then when {{-(a1,b1,3,jk1) }}arrives Final
Rank, it will find that then sortedMap is not empty, and it doesn't contains
sort key value 3. meet the conditions for that Runtime Exception.
we met this exception in our production environment, it's very serious because
when is happens, the job can not recover automatically as the state is polluted.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)