[ 
https://issues.apache.org/jira/browse/FLINK-37747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17949356#comment-17949356
 ] 

David commented on FLINK-37747:
-------------------------------

Hey [~arvid] Thanks for reviewing the PR. I updated the PR accordingly. Do you 
think the PR is OK to merge?

This issue is also in 1.20.1 release. Do you have guideline to apply the fix to 
1.20.1 release? Thanks!

> GlobalCommitterOperator cannot commit after scaling writer/committer
> --------------------------------------------------------------------
>
>                 Key: FLINK-37747
>                 URL: https://issues.apache.org/jira/browse/FLINK-37747
>             Project: Flink
>          Issue Type: Bug
>            Reporter: David
>            Assignee: David
>            Priority: Blocker
>              Labels: pull-request-available
>
> Hey,
> Our FLINK job stopped writing into Delta table with FLINK Delta connector 
> frequently. After checking the issue, we found in GlobalCommitterOperator, in 
> [commit|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L207]
>  function, it was returned directly when checking some checkpoint has 
> finished or not(this 
> [code|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L211]).
>  The issue was happened when:
>  * auto-scaler scales up chained writer/committer(the direct upstream 
> operator of GlobalCommitterOperator)
>  * job ran limited TM first with lower parallelism for writer/committer, and 
> then writer/committer was scaled up to higher parallelism
> After debugging with more logs, we found the cause of the issue. An example 
> is:
>  * for checkpoint 3, FLINK job completed successfully with 3 writer/committer 
> in parallel
>  ** All committable objects in writer/committer were saved into checkpoint 
> state in checkpoint 3
>  * writer/committer was scaled up to 5 parallel tasks
>  * writer/committer restore state from checkpoint 3, they will emit 
> committable objects from checkpoint 3. code is 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L139]
>  ** latest parallelism of writer/committer is used, which is 5 in 
> CommittableSummary. Code is 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L186]
>  * GlobalCommitterOpeartor received committable summary from 
> writer/committer, it knows:
>  ** 5 parallel writer/committer from upstreams
>  ** it will look for committable summary from 5 upstream writer/committer
>  * 3 writer/committers emit CommittableSummary to global committer operator 
> as only 3 restore state from checkpoint 3
>  * Global committer operator stuck here forever as it looks for committable 
> summary for 5 subtasks from upstream operator
> We have a quick solution for this case and raise a PR to fix this. 
> We are using FLINK 1.20 but we found the issue is still existed in master 
> branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to