[ 
https://issues.apache.org/jira/browse/FLINK-26408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hengyu Dai updated FLINK-26408:
-------------------------------
    Description: 
RetractableTopNFunction will throw a RuntimeException when
 # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>> 
treeMap{color} is not empty.
 # and the sorted Map doesn't contain current sort key.

Now we have Flink SQL job:
{code:java}
// table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)

select
a_key,a_time,a_jk,b_key,b_time,b_jk
from
(
    select
    a_key,a_time,a_jk,b_key,b_time,b_jk,
    row_number() over(partition by a_key order by a_time desc) as rn
    from
    (
        select a_key, a_time, a_jk
        from (
            select * , row_number() over(partition by a_key order by a_time 
desc) as rn
            from table_a
        ) tmp1
        where rn = 1
    ) t1
    left join
    (
        select b_key, b_time, b_jk
        from (
            select * , row_number() over(partition by b_key order by b_time 
desc) as rn
            from table_b
        ) tmp2
        where rn = 1
    ) t2
    on t1.a_jk = t2.b_jk
) t3
where rn = 1{code}
the JobGraph is like:

{{Source table_a  —>  Rank_a}}

                                                               {{—>  Join  —> 
Final Rank         }}             

{{Source table_b —>   Rank_b}}

Suppose we hava following input:
||ts||SourceA
(a_key, a_time,a_jk)||SourceB
(b_key,b_time,b_jk)||RankA
(a_key, a_time,a_jk)||RankB
(b_key,b_time,b_jk)||Join
(a_key,b_key,a_time, a_jk)||Final Rank
(a_key,b_key,a_time)||
|t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | |
|t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | |
|t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)|
|t4|+(a1,4,jk1)| |-(a1,3,jk1)
+(a1,4,jk1)| |-(a1,b1,3,jk1)
+(a1,b1,4,jk1)|-(a1,b1,3)
+(a1,b1,4)|
|t5|+(a1,5,jk2)| |-(a1,4,jk1)
+(a1,5,jk2)| |-(a1,b1,4,jk1)
+(a1,b2,5,jk2)|-(a1,b1,4)
+(a1,b2,5)|
| | | | | | | |

 

Assume:
 # t4&t5 is almost at the same time, the Join Operator produce 4 message at 
t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) (which hashed 
with jk2) may runs on different task from the other 3 messages(hashed with 
jk1), and it may arrrive Final Rank earlier than them.
 # Due to network congestion or high machine load, etc. the messages produced 
by t4&t5 take a while before they arrive Final Rank, when Final Rank received 
them, the state is expired because of State TTL, the treeMap state is cleared.

Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key 
a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it will 
find that the sortedMap is not empty, and it doesn't contains sort key value 3. 
meet the conditions for that Runtime Exception.

we met this exception in our production environment (Flink verision 1.12.2), 
it's very serious because when is happens, the job can not recover 
automatically as the state is polluted.

  was:
RetractableTopNFunction will throw a RuntimeException when
 # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>> 
treeMap{color} is not empty.
 # and the sorted Map doesn't contains current sort key.

 

Now we have Flink SQL job:

 
{code:java}
// table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)

select
a_key,a_time,a_jk,b_key,b_time,b_jk
from
(
    select
    a_key,a_time,a_jk,b_key,b_time,b_jk,
    row_number() over(partition by a_key order by a_time desc) as rn
    from
    (
        select a_key, a_time, a_jk
        from (
            select * , row_number() over(partition by a_key order by a_time 
desc) as rn
            from table_a
        ) tmp1
        where rn = 1
    ) t1
    left join
    (
        select b_key, b_time, b_jk
        from (
            select * , row_number() over(partition by b_key order by b_time 
desc) as rn
            from table_b
        ) tmp2
        where rn = 1
    ) t2
    on t1.a_jk = t2.b_jk
) t3
where rn = 1{code}
the JobGraph is like:

{{Source table_a  —>  Rank_a}}

                                                               {{—>  Join  —> 
Final Rank         }}             

{{Source table_b —>   Rank_b}}

 
 
 

Suppose we hava following input:

 
 # 
||ts||SourceA
(a_key, a_time,a_jk)||SourceB
(b_key,b_time,b_jk)||RankA
(a_key, a_time,a_jk)||RankB
(b_key,b_time,b_jk)||Join
(a_key,b_key,a_time, a_jk)||Final Rank
(a_key,b_key,a_time)||
|t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | |
|t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | |
|t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)|
|t4|+(a1,4,jk1)| |-(a1,3,jk1)
+(a1,4,jk1)| |-(a1,b1,3,jk1)
+(a1,b1,4,jk1)|-(a1,b1,3)
+(a1,b1,4)|
|t5|+(a1,5,jk2)| |-(a1,4,jk1)
+(a1,5,jk2)| |-(a1,b1,4,jk1)
+(a1,b2,5,jk2)|-(a1,b1,4)
+(a1,b2,5)|
| | | | | | | |

Assume:

 
 # t4&t5 is almost at the same time, the Join Operator produce 4 message at 
t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) may runs on 
different task from the other 3 messages, and it may arrrive earlier than them
 # Due to network congestion or high machine load, etc. the messages produced 
by t4&t5 take a while before they arrive Final Rank, when Final Rank received 
them, the state is expired because of State TTL.

Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key 
a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it will 
find that then sortedMap is not empty, and it doesn't contains sort key value 
3. meet the conditions for that Runtime Exception.

we met this exception in our production environment (Flink verision 1.12.2), 
it's very serious because when is happens, the job can not recover 
automatically as the state is polluted.

 

 

 

 

 


> retract a non-existent record in RetractableTopNFunction 
> ---------------------------------------------------------
>
>                 Key: FLINK-26408
>                 URL: https://issues.apache.org/jira/browse/FLINK-26408
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner, Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: Hengyu Dai
>            Priority: Blocker
>
> RetractableTopNFunction will throw a RuntimeException when
>  # the sorted Map {color:#0747a6}ValueState<SortedMap<RowData, Long>> 
> treeMap{color} is not empty.
>  # and the sorted Map doesn't contain current sort key.
> Now we have Flink SQL job:
> {code:java}
> // table_a(a_key, a_time, a_jk), table_b(b_key, b_time, b_jk)
> select
> a_key,a_time,a_jk,b_key,b_time,b_jk
> from
> (
>     select
>     a_key,a_time,a_jk,b_key,b_time,b_jk,
>     row_number() over(partition by a_key order by a_time desc) as rn
>     from
>     (
>         select a_key, a_time, a_jk
>         from (
>             select * , row_number() over(partition by a_key order by a_time 
> desc) as rn
>             from table_a
>         ) tmp1
>         where rn = 1
>     ) t1
>     left join
>     (
>         select b_key, b_time, b_jk
>         from (
>             select * , row_number() over(partition by b_key order by b_time 
> desc) as rn
>             from table_b
>         ) tmp2
>         where rn = 1
>     ) t2
>     on t1.a_jk = t2.b_jk
> ) t3
> where rn = 1{code}
> the JobGraph is like:
> {{Source table_a  —>  Rank_a}}
>                                                                {{—>  Join  —> 
> Final Rank         }}             
> {{Source table_b —>   Rank_b}}
> Suppose we hava following input:
> ||ts||SourceA
> (a_key, a_time,a_jk)||SourceB
> (b_key,b_time,b_jk)||RankA
> (a_key, a_time,a_jk)||RankB
> (b_key,b_time,b_jk)||Join
> (a_key,b_key,a_time, a_jk)||Final Rank
> (a_key,b_key,a_time)||
> |t1| |+(b1,1,jk1)| |+(b1,1,jk1)| | |
> |t2| |+(b2,2,jk2)| |+(b2,2,jk2)| | |
> |t3|+(a1,3,jk1)| |+(a1,3,jk1)| |+(a1,b1,3,jk1)|+(a1,b1,3)|
> |t4|+(a1,4,jk1)| |-(a1,3,jk1)
> +(a1,4,jk1)| |-(a1,b1,3,jk1)
> +(a1,b1,4,jk1)|-(a1,b1,3)
> +(a1,b1,4)|
> |t5|+(a1,5,jk2)| |-(a1,4,jk1)
> +(a1,5,jk2)| |-(a1,b1,4,jk1)
> +(a1,b2,5,jk2)|-(a1,b1,4)
> +(a1,b2,5)|
> | | | | | | | |
>  
> Assume:
>  # t4&t5 is almost at the same time, the Join Operator produce 4 message at 
> t4&t5, as the Hash Key changed(from jk1 to jk2), +(a1,b2,5,jk2) (which hashed 
> with jk2) may runs on different task from the other 3 messages(hashed with 
> jk1), and it may arrrive Final Rank earlier than them.
>  # Due to network congestion or high machine load, etc. the messages produced 
> by t4&t5 take a while before they arrive Final Rank, when Final Rank received 
> them, the state is expired because of State TTL, the treeMap state is cleared.
> Now if +(a1,b2,5,jk2)arrives Final Rank first, the sortedMap of partition key 
> a1 will put a sort value 5. then when -(a1,b1,3,jk1)arrives Final Rank, it 
> will find that the sortedMap is not empty, and it doesn't contains sort key 
> value 3. meet the conditions for that Runtime Exception.
> we met this exception in our production environment (Flink verision 1.12.2), 
> it's very serious because when is happens, the job can not recover 
> automatically as the state is polluted.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to