[
https://issues.apache.org/jira/browse/FLINK-38433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18052806#comment-18052806
]
Lucas Borges commented on FLINK-38433:
--------------------------------------
Hey [~zakelly] and [~AlexYinHan] ! Thank you for the patches! (let me know if I
should create another issue for this)
I ran into a very similar issue to this one and the previous
https://issues.apache.org/jira/browse/FLINK-38324, this time running Flink
2.2.0. However, my application had (purposefully, for testing forst) a big
state (around 3TB full size and 400GB increment size) and was under a lot of
backpressure at the time. Due to that, I had also enabled unaligned checkpoints.
App was running fine until a point where we hit the following issues on some
subtasks of our async state operator. From this point onwards, the Flink
deployment would enter a restart loop eternally, with other similar errors.
Here is a CSV file from the errors around the time the failure started, if it
helps: [^extract-2026-01-19T09_16_55.235Z.csv]
{{{}java.io.IOException: Could not perform checkpoint 47 for operator
store-resources (143/384)#2.{}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1416){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:56){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:182){}}}{{{}at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:160){}}}{{{}at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:171){}}}{{{}at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:646){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:988){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:925){}}}{{{}at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973){}}}{{{}at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:955){}}}{{{}at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760){}}}{{{}at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:569){}}}{{{}at
java.base/java.lang.Thread.run(Thread.java:840){}}}{{{}Caused by:
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 47 for operator store-resources (143/384)#2. Failure reason:
Checkpoint was declined.{}}}{{{}at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:318){}}}{{{}at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:202){}}}{{{}at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:411){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:752){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:362){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1459){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1447){}}}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1404){}}}{{{}...
21 more{}}}{{{}Caused by: java.io.FileNotFoundException: File does not exist:
s3p://dd-flink-state-us1-prod-dog/_entropy_/flink-2-smoke-testing-job/checkpoints/5468f75c04d8e0e48bfba281f4e96e7e/shared/op_AsyncStreamFlatMap_27207667b5bbbc907c6d8368252fc5d1__143_384__attempt_2/db/e6ba00ab-884c-41de-89c2-4b59f72e6fe3{}}}{{{}at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:361){}}}{{{}at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88){}}}{{{}at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106){}}}{{{}at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78){}}}{{{}at
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:340){}}}{{{}at
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:376){}}}{{{}at
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52){}}}{{{}at
org.forstdb.RocksDB.enableFileDeletions(Native Method){}}}{{{}at
org.forstdb.RocksDB.enableFileDeletions(RocksDB.java:4312){}}}{{{}at
org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.syncPrepareResources(ForStNativeFullSnapshotStrategy.java:207){}}}{{{}at
org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.syncPrepareResources(ForStNativeFullSnapshotStrategy.java:64){}}}{{{}at
org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77){}}}{{{}at
org.apache.flink.state.forst.ForStKeyedStateBackend.snapshot(ForStKeyedStateBackend.java:543){}}}{{{}at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:278){}}}{{{}...
32 more{}}}
> Avoid delete ForSt's directory when there happened to be an existing one
> ------------------------------------------------------------------------
>
> Key: FLINK-38433
> URL: https://issues.apache.org/jira/browse/FLINK-38433
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.0.0, 2.1.0
> Reporter: Zakelly Lan
> Assignee: Han Yin
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.1, 2.2.0, 2.1.1
>
> Attachments: extract-2026-01-19T09_16_55.235Z.csv
>
>
> Currently, ForSt State backend will create the working directory, if there
> exists one with same name the directory will be cleared. The name contains
> jobid, vertexid, task_index, parallelism and attempt. In JM failover
> scenario, this would be a collision as the attempt number restart from 0,
> resulting in checkpoint broken.
>
> We should remove the clear existing directory logic.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)