[ 
https://issues.apache.org/jira/browse/FLINK-38433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18052806#comment-18052806
 ] 

Lucas Borges commented on FLINK-38433:
--------------------------------------

Hey [~zakelly] and [~AlexYinHan] ! Thank you for the patches! (let me know if I 
should create another issue for this)

I ran into a very similar issue to this one and the previous 
https://issues.apache.org/jira/browse/FLINK-38324, this time running Flink 
2.2.0. However, my application had (purposefully, for testing forst) a big 
state (around 3TB full size and 400GB increment size) and was under a lot of 
backpressure at the time. Due to that, I had also enabled unaligned checkpoints.

App was running fine until a point where we hit the following issues on some 
subtasks of our async state operator. From this point onwards, the Flink 
deployment would enter a restart loop eternally, with other similar errors.

Here is a CSV file from the errors around the time the failure started, if it 
helps: [^extract-2026-01-19T09_16_55.235Z.csv]

{{{}java.io.IOException: Could not perform checkpoint 47 for operator 
store-resources (143/384)#2.{}}}{{{}at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1416){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:78){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:56){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:182){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:160){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:171){}}}{{{}at
 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:646){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:988){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:925){}}}{{{}at
 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:973){}}}{{{}at
 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:955){}}}{{{}at
 org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:760){}}}{{{}at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:569){}}}{{{}at 
java.base/java.lang.Thread.run(Thread.java:840){}}}{{{}Caused by: 
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 47 for operator store-resources (143/384)#2. Failure reason: 
Checkpoint was declined.{}}}{{{}at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:318){}}}{{{}at
 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:202){}}}{{{}at
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:411){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:752){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:362){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$18(StreamTask.java:1459){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1447){}}}{{{}at
 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1404){}}}{{{}...
 21 more{}}}{{{}Caused by: java.io.FileNotFoundException: File does not exist: 
s3p://dd-flink-state-us1-prod-dog/_entropy_/flink-2-smoke-testing-job/checkpoints/5468f75c04d8e0e48bfba281f4e96e7e/shared/op_AsyncStreamFlatMap_27207667b5bbbc907c6d8368252fc5d1__143_384__attempt_2/db/e6ba00ab-884c-41de-89c2-4b59f72e6fe3{}}}{{{}at
 
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:361){}}}{{{}at
 
org.apache.flink.fs.s3presto.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:88){}}}{{{}at
 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.getFileStatus(PluginFileSystemFactory.java:106){}}}{{{}at
 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:78){}}}{{{}at
 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.getFileStatus(ForStFlinkFileSystem.java:340){}}}{{{}at
 
org.apache.flink.state.forst.fs.ForStFlinkFileSystem.listStatus(ForStFlinkFileSystem.java:376){}}}{{{}at
 
org.apache.flink.state.forst.fs.StringifiedForStFileSystem.listStatus(StringifiedForStFileSystem.java:52){}}}{{{}at
 org.forstdb.RocksDB.enableFileDeletions(Native Method){}}}{{{}at 
org.forstdb.RocksDB.enableFileDeletions(RocksDB.java:4312){}}}{{{}at 
org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.syncPrepareResources(ForStNativeFullSnapshotStrategy.java:207){}}}{{{}at
 
org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy.syncPrepareResources(ForStNativeFullSnapshotStrategy.java:64){}}}{{{}at
 
org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77){}}}{{{}at
 
org.apache.flink.state.forst.ForStKeyedStateBackend.snapshot(ForStKeyedStateBackend.java:543){}}}{{{}at
 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:278){}}}{{{}...
 32 more{}}}

> Avoid delete ForSt's directory when there happened to be an existing one
> ------------------------------------------------------------------------
>
>                 Key: FLINK-38433
>                 URL: https://issues.apache.org/jira/browse/FLINK-38433
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Zakelly Lan
>            Assignee: Han Yin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.0.1, 2.2.0, 2.1.1
>
>         Attachments: extract-2026-01-19T09_16_55.235Z.csv
>
>
> Currently, ForSt State backend will create the working directory, if there 
> exists one with same name the directory will be cleared. The name contains 
> jobid, vertexid, task_index, parallelism and attempt. In JM failover 
> scenario, this would be a collision as the attempt number restart from 0, 
> resulting in checkpoint broken.
>  
> We should remove the clear existing directory logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to