[
https://issues.apache.org/jira/browse/FLINK-37747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18032773#comment-18032773
]
Eric Nascimento edited comment on FLINK-37747 at 10/24/25 2:01 PM:
-------------------------------------------------------------------
Hello,
We are facing once again a similar issue where our Flink job stopped writing to
Delta, similar to FLINK-37747, the GlobalCommitter was not being called.
It seems that our application got stuck in a checkpoint ({*}873{*}) that has a
mismatching state where the number of *subtasks (2)* does not equal the number
of *subtask committable managers (1).*
During debugging, we identified that the *GlobalCommitter* is not being
triggered because of a failing [condition
check|https://github.com/apache/flink/blob/87f898ec800c53e1c996634ca0a1f64bc79ecfca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java#L138].
The condition expects these values to be equal.
However, these values do not match, thus the GlobalCommitter from executing. We
did have a bit of restarts during the time this checkpoint was being taken (see
logs), So I'm not sure if that's the culprit.
Different from FLINK-37747, this time is harder to reproduce, just scaling up
the application is not enough.
It looks like it is a very specific edge case that causes the mismatching
state. Any ideas or suggestions? Could it be that we also need to add a similar
check as for the one reported in issue FLINK-37747 on the
CommittableCollectorSerializer? I'm honestly not sure how to proceed here.
–
*Flink version: 1.20.3*
*Checkpoint failure logs:*
{noformat}
2025-10-17 22:55:17.553 2025-10-17 20:55:17,553 [Delta Sink: transactions:
Global Committer (1/1)#4] WARN
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - Delta Sink: transactions: Global Committer (1/1)#4
(0c370242facfda1d112e94882bcbd052_5991bfefb6d30e6487b8f3e5342389c8_0_4):
Received checkpoint barrier for checkpoint 874 before completing current
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:46.971 2025-10-17 20:51:46,971 [Delta Sink: denied-purchases:
Global Committer (1/1)#4] WARN
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - Delta Sink: denied-purchases: Global Committer (1/1)#4
(0c370242facfda1d112e94882bcbd052_a858192fe71e42efe3da125ec5940cc3_0_4):
Received checkpoint barrier for checkpoint 874 before completing current
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:46.971 2025-10-17 20:51:46,971 [Delta Sink: fees: Global
Committer (1/1)#4] WARN
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - Delta Sink: fees: Global Committer (1/1)#4
(0c370242facfda1d112e94882bcbd052_2c03ec2835e560f8b6522a2c8cc8ed7a_0_4):
Received checkpoint barrier for checkpoint 874 before completing current
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:45.104 2025-10-17 20:51:45,103 [Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
trigger or complete checkpoint 873 for job 50ac0f996fb31d6aef83ca9c8738a401. (0
consecutive failed attempts so far)
2025-10-17 22:51:45.104 2025-10-17 20:51:45,104 [SourceCoordinator-Source:
datomic-single-split] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 873 as aborted for source Source: datomic-single-split.
2025-10-17 22:51:00.103 2025-10-17 20:51:00,102 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 873 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE,
formatType=CANONICAL}) @ 1760734260075 for job
50ac0f996fb31d6aef83ca9c8738a401{noformat}
was (Author: JIRAUSER310966):
Hello,
We are facing once again a similar issue where our Flink job stopped writing to
Delta, similar to FLINK-37747, the GlobalCommitter was not being called.
It seems that our application got stuck in a checkpoint ({*}873{*}) that has a
mismatching state where the number of *subtasks* does not equal the number of
{*}subtask committable managers{*}.
During debugging, we identified that the *GlobalCommitter* is not being
triggered because of a failing [condition
check|https://github.com/apache/flink/blob/87f898ec800c53e1c996634ca0a1f64bc79ecfca/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java#L138].
The condition expects these values to be equal.
However, these values do not match, thus the GlobalCommitter from executing. We
did have a bit of restarts during the time this checkpoint was being taken (see
logs), So I'm not sure if that's the culprit.
Different from FLINK-37747, this time is harder to reproduce, just scaling up
the application is not enough.
It looks like it is a very specific edge case that causes the mismatching
state. Any ideas or suggestions? Could it be that we also need to add a similar
check as for the one reported in issue FLINK-37747 on the
CommittableCollectorSerializer? I'm honestly not sure how to proceed here.
--
*Flink version: 1.20.3*
*Checkpoint failure logs:*
{noformat}
2025-10-17 22:55:17.553 2025-10-17 20:55:17,553 [Delta Sink: transactions:
Global Committer (1/1)#4] WARN
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - Delta Sink: transactions: Global Committer (1/1)#4
(0c370242facfda1d112e94882bcbd052_5991bfefb6d30e6487b8f3e5342389c8_0_4):
Received checkpoint barrier for checkpoint 874 before completing current
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:46.971 2025-10-17 20:51:46,971 [Delta Sink: denied-purchases:
Global Committer (1/1)#4] WARN
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - Delta Sink: denied-purchases: Global Committer (1/1)#4
(0c370242facfda1d112e94882bcbd052_a858192fe71e42efe3da125ec5940cc3_0_4):
Received checkpoint barrier for checkpoint 874 before completing current
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:46.971 2025-10-17 20:51:46,971 [Delta Sink: fees: Global
Committer (1/1)#4] WARN
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - Delta Sink: fees: Global Committer (1/1)#4
(0c370242facfda1d112e94882bcbd052_2c03ec2835e560f8b6522a2c8cc8ed7a_0_4):
Received checkpoint barrier for checkpoint 874 before completing current
checkpoint 873. Skipping current checkpoint.
2025-10-17 22:51:45.104 2025-10-17 20:51:45,103 [Checkpoint Timer] WARN
org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
trigger or complete checkpoint 873 for job 50ac0f996fb31d6aef83ca9c8738a401. (0
consecutive failed attempts so far)
2025-10-17 22:51:45.104 2025-10-17 20:51:45,104 [SourceCoordinator-Source:
datomic-single-split] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
checkpoint 873 as aborted for source Source: datomic-single-split.
2025-10-17 22:51:00.103 2025-10-17 20:51:00,102 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 873 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE,
formatType=CANONICAL}) @ 1760734260075 for job
50ac0f996fb31d6aef83ca9c8738a401{noformat}
> GlobalCommitterOperator cannot commit after scaling writer/committer
> --------------------------------------------------------------------
>
> Key: FLINK-37747
> URL: https://issues.apache.org/jira/browse/FLINK-37747
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 2.0.0, 1.19.2, 1.20.1, 2.1.0
> Reporter: David
> Assignee: David
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 2.1.0, 2.0.1, 1.20.3
>
>
> Hey,
> Our FLINK job stopped writing into Delta table with FLINK Delta connector
> frequently. After checking the issue, we found in GlobalCommitterOperator, in
> [commit|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L207]
> function, it was returned directly when checking some checkpoint has
> finished or not(this
> [code|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java#L211]).
> The issue was happened when:
> * auto-scaler scales up chained writer/committer(the direct upstream
> operator of GlobalCommitterOperator)
> * job ran limited TM first with lower parallelism for writer/committer, and
> then writer/committer was scaled up to higher parallelism
> After debugging with more logs, we found the cause of the issue. An example
> is:
> * for checkpoint 3, FLINK job completed successfully with 3 writer/committer
> in parallel
> ** All committable objects in writer/committer were saved into checkpoint
> state in checkpoint 3
> * writer/committer was scaled up to 5 parallel tasks
> * writer/committer restore state from checkpoint 3, they will emit
> committable objects from checkpoint 3. code is
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L139]
> ** latest parallelism of writer/committer is used, which is 5 in
> CommittableSummary. Code is
> [here|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L186]
> * GlobalCommitterOpeartor received committable summary from
> writer/committer, it knows:
> ** 5 parallel writer/committer from upstreams
> ** it will look for committable summary from 5 upstream writer/committer
> * 3 writer/committers emit CommittableSummary to global committer operator
> as only 3 restore state from checkpoint 3
> * Global committer operator stuck here forever as it looks for committable
> summary for 5 subtasks from upstream operator
> We have a quick solution for this case and raise a PR to fix this.
> We are using FLINK 1.20 but we found the issue is still existed in master
> branch.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)