[ 
https://issues.apache.org/jira/browse/FLINK-37747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18032773#comment-18032773
 ] 

Eric Nascimento commented on FLINK-37747:
-----------------------------------------

Hello,

We are facing once again a similar issue where our Flink job stopped writing to 
Delta, similar to FLINK-37747, the GlobalCommitter was not being called.
It seems that our application got stuck in a checkpoint ({*}873{*}) that has a 
mismatching state where the number of *subtasks* does not equal the number of 
{*}subtask committable managers{*}.

During debugging, we identified that the *GlobalCommitter* is not being 
triggered because of a failing [condition 
check|https://github.com/apache/flink/blob/87f898ec800c53e1c996634ca0a1f64bc79ecfca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java#L138].
 The condition expects these values to be equal.


However, these values do not match, thus the GlobalCommitter from executing. We 
did have a bit of restarts during the time this checkpoint was being taken (see 
logs), So I'm not sure if that's the culprit.
Different from FLINK-37747, this time is harder to reproduce, just scaling up 
the application is not enough.

 

It looks like it is a very specific edge case that causes the mismatching 
state. Any ideas or suggestions? Could it be that we also need to add a similar 
check as for the one reported in issue FLINK-37747 on the 
CommittableCollectorSerializer? I'm honestly not sure how to proceed here.

--
*Flink version: 1.20.3*
*Checkpoint failure logs:*
{noformat}
2025-10-17 22:55:17.553 2025-10-17 20:55:17,553 [Delta Sink: transactions: 
Global Committer (1/1)#4] WARN  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
 [] - Delta Sink: transactions: Global Committer (1/1)#4 
(0c370242facfda1d112e94882bcbd052_5991bfefb6d30e6487b8f3e5342389c8_0_4): 
Received checkpoint barrier for checkpoint 874 before completing current 
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:46.971 2025-10-17 20:51:46,971 [Delta Sink: denied-purchases: 
Global Committer (1/1)#4] WARN  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
 [] - Delta Sink: denied-purchases: Global Committer (1/1)#4 
(0c370242facfda1d112e94882bcbd052_a858192fe71e42efe3da125ec5940cc3_0_4): 
Received checkpoint barrier for checkpoint 874 before completing current 
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:46.971 2025-10-17 20:51:46,971 [Delta Sink: fees: Global 
Committer (1/1)#4] WARN  
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
 [] - Delta Sink: fees: Global Committer (1/1)#4 
(0c370242facfda1d112e94882bcbd052_2c03ec2835e560f8b6522a2c8cc8ed7a_0_4): 
Received checkpoint barrier for checkpoint 874 before completing current 
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:45.104 2025-10-17 20:51:45,103 [Checkpoint Timer] WARN  
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
trigger or complete checkpoint 873 for job 50ac0f996fb31d6aef83ca9c8738a401. (0 
consecutive failed attempts so far)
2025-10-17 22:51:45.104 2025-10-17 20:51:45,104 [SourceCoordinator-Source: 
datomic-single-split] INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
checkpoint 873 as aborted for source Source: datomic-single-split.
2025-10-17 22:51:00.103 2025-10-17 20:51:00,102 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 873 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, 
formatType=CANONICAL}) @ 1760734260075 for job 
50ac0f996fb31d6aef83ca9c8738a401{noformat}
 

> GlobalCommitterOperator cannot commit after scaling writer/committer
> --------------------------------------------------------------------
>
>                 Key: FLINK-37747
>                 URL: https://issues.apache.org/jira/browse/FLINK-37747
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 2.0.0, 1.19.2, 1.20.1, 2.1.0
>            Reporter: David
>            Assignee: David
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 2.1.0, 2.0.1, 1.20.3
>
>
> Hey,
> Our FLINK job stopped writing into Delta table with FLINK Delta connector 
> frequently. After checking the issue, we found in GlobalCommitterOperator, in 
> [commit|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L207]
>  function, it was returned directly when checking some checkpoint has 
> finished or not(this 
> [code|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L211]).
>  The issue was happened when:
>  * auto-scaler scales up chained writer/committer(the direct upstream 
> operator of GlobalCommitterOperator)
>  * job ran limited TM first with lower parallelism for writer/committer, and 
> then writer/committer was scaled up to higher parallelism
> After debugging with more logs, we found the cause of the issue. An example 
> is:
>  * for checkpoint 3, FLINK job completed successfully with 3 writer/committer 
> in parallel
>  ** All committable objects in writer/committer were saved into checkpoint 
> state in checkpoint 3
>  * writer/committer was scaled up to 5 parallel tasks
>  * writer/committer restore state from checkpoint 3, they will emit 
> committable objects from checkpoint 3. code is 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L139]
>  ** latest parallelism of writer/committer is used, which is 5 in 
> CommittableSummary. Code is 
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L186]
>  * GlobalCommitterOpeartor received committable summary from 
> writer/committer, it knows:
>  ** 5 parallel writer/committer from upstreams
>  ** it will look for committable summary from 5 upstream writer/committer
>  * 3 writer/committers emit CommittableSummary to global committer operator 
> as only 3 restore state from checkpoint 3
>  * Global committer operator stuck here forever as it looks for committable 
> summary for 5 subtasks from upstream operator
> We have a quick solution for this case and raise a PR to fix this. 
> We are using FLINK 1.20 but we found the issue is still existed in master 
> branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to