[
https://issues.apache.org/jira/browse/FLINK-31560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705048#comment-17705048
]
Yanfei Lei commented on FLINK-31560:
------------------------------------
[~Brian Zhou] Thanks for sharing, I tried to reproduce it in my local
standalone cluster(Flink 1.16.1), but I failed. Here are my settings, is it
different from your job?
* checkpoint interval: 60s
* state backend: Rocksdb state backend
* enable incremental checkpoint
* job: examples/streaming/SocketWindowWordCount.jar
* savepoint request:
{code:java}
curl -d "@req.json" -H "Content-Type: application/json" -X POST
http://localhost:8081/jobs/xxxxx/savepoints
req.json:
{
"cancel-job" : true,
"formatType" : "CANONICAL",
"target-directory" : "file:///Users/leiyanfei/flink-1.16.1/sp"
}{code}
> I want to ask if by default it is canonical savepoint.
The default format type of savepoint is canonical, the following log in JM.log
also confirms that your savepoints are canonical.
{code:java}
2023-03-23 06:11:58,125 [flink-akka.actor.default-dispatcher-21] INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering
cancel-with-savepoint for job 7354442cd6f7c121249360680c04284d.
2023-03-23 06:11:58,149 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 2444 (type=SavepointType{name='Savepoint',
postCheckpointAction=NONE, formatType=CANONICAL}) @ 1679551918125 for job
7354442cd6f7c121249360680c04284d. {code}
Normally, the handle corresponding to savepoint is
[KeyGroupsSavepointStateHandle|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java#L82]],
and the placeholder handle should not exist, could you please share the TM log
of rocksdb state backend? I want to know the type of stream handle reported by
TM to JM?
> Savepoint failing to complete with incremental RocksDB statebackend
> -------------------------------------------------------------------
>
> Key: FLINK-31560
> URL: https://issues.apache.org/jira/browse/FLINK-31560
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.16.0
> Reporter: Fan Yang
> Priority: Major
> Attachments: image-2023-03-23-18-03-05-943.png,
> image-2023-03-23-18-19-24-482.png, jobmanager_log.txt
>
>
> Flink version: 1.16.0
>
> We are using Flink to run some streaming applications with Pravega as source
> and use window and reduce transformations. We use RocksDB state backend with
> incremental checkpointing enabled. We don't enable the latest changelog state
> backend.
> When we try to stop the job, we encounter issues with the savepoint failing
> to complete for the job. This happens most of the time. On rare occasions,
> the job gets canceled suddenly with its savepoint get completed successfully.
> Savepointing shows below error:
>
> {code:java}
> 2023-03-22 08:55:57,521 [jobmanager-io-thread-1] WARN
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
> trigger or complete checkpoint 189 for job 7354442cd6f7c121249360680c04284d.
> (0 consecutive failed attempts so
> far)org.apache.flink.runtime.checkpoint.CheckpointException: Failure to
> finalize checkpoint. at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?] at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.io.IOException: Unknown implementation of StreamStateHandle:
> class org.apache.flink.runtime.state.PlaceholderStreamStateHandle at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:699)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandleMap(MetadataV2V3SerializerBase.java:813)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateHandle(MetadataV2V3SerializerBase.java:344)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateCol(MetadataV2V3SerializerBase.java:269)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeSubtaskState(MetadataV2V3SerializerBase.java:262)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeSubtaskState(MetadataV3Serializer.java:142)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:122)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:146)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:100)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:87)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:82)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:333)
> ~[flink-dist-1.16.0.jar:1.16.0] at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
> ~[flink-dist-1.16.0.jar:1.16.0] ... 7 more {code}
>
> Prior to Flink 1.16, we did not observe this error. Since
> `PlaceholderStreamStateHandle` is used to indicate it's a reusable RocksDB
> data for incremental checkpoint, we believe that the new improvements of
> incremental checkpoint introduced in flink 1.16 release might be related to
> this issue.
> We require assistance in investigating this issue and finding a solution.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)