[ 
https://issues.apache.org/jira/browse/FLINK-31560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705048#comment-17705048
 ] 

Yanfei Lei commented on FLINK-31560:
------------------------------------

[~Brian Zhou] Thanks for sharing, I tried to reproduce it in my local 
standalone cluster(Flink 1.16.1), but I failed. Here are my settings, is it 
different from your job?
 * checkpoint interval: 60s
 * state backend: Rocksdb state backend
 * enable incremental checkpoint
 * job: examples/streaming/SocketWindowWordCount.jar
 * savepoint request:

 
{code:java}
curl -d "@req.json" -H "Content-Type: application/json"  -X POST 
http://localhost:8081/jobs/xxxxx/savepoints 

req.json:
{
    "cancel-job" : true,
    "formatType" : "CANONICAL",
    "target-directory" : "file:///Users/leiyanfei/flink-1.16.1/sp"
}{code}
 

> I want to ask if by default it is canonical savepoint.

The default format type of savepoint is canonical, the following log in JM.log 
also confirms that your savepoints are canonical. 

 
{code:java}
2023-03-23 06:11:58,125 [flink-akka.actor.default-dispatcher-21] INFO  
org.apache.flink.runtime.jobmaster.JobMaster                 [] - Triggering 
cancel-with-savepoint for job 7354442cd6f7c121249360680c04284d.
2023-03-23 06:11:58,149 [Checkpoint Timer] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 2444 (type=SavepointType{name='Savepoint', 
postCheckpointAction=NONE, formatType=CANONICAL}) @ 1679551918125 for job 
7354442cd6f7c121249360680c04284d. {code}
 

Normally, the handle corresponding to savepoint is 
[KeyGroupsSavepointStateHandle|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/FullSnapshotAsyncWriter.java#L82]],
 and the placeholder handle should not exist, could you please share the TM log 
of rocksdb state backend? I want to know the type of stream handle reported by 
TM to JM?

 

> Savepoint failing to complete with incremental RocksDB statebackend
> -------------------------------------------------------------------
>
>                 Key: FLINK-31560
>                 URL: https://issues.apache.org/jira/browse/FLINK-31560
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.16.0
>            Reporter: Fan Yang
>            Priority: Major
>         Attachments: image-2023-03-23-18-03-05-943.png, 
> image-2023-03-23-18-19-24-482.png, jobmanager_log.txt
>
>
> Flink version: 1.16.0
>  
> We are using Flink to run some streaming applications with Pravega as source 
> and use window and reduce transformations. We use RocksDB state backend with 
> incremental checkpointing enabled. We don't enable the latest changelog state 
> backend.
> When we try to stop the job, we encounter issues with the savepoint failing 
> to complete for the job. This happens most of the time. On rare occasions, 
> the job gets canceled suddenly with its savepoint get completed successfully.
> Savepointing shows below error:
>  
> {code:java}
> 2023-03-22 08:55:57,521 [jobmanager-io-thread-1] WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
> trigger or complete checkpoint 189 for job 7354442cd6f7c121249360680c04284d. 
> (0 consecutive failed attempts so 
> far)org.apache.flink.runtime.checkpoint.CheckpointException: Failure to 
> finalize checkpoint.    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]    at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: java.io.IOException: Unknown implementation of StreamStateHandle: 
> class org.apache.flink.runtime.state.PlaceholderStreamStateHandle    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:699)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandleMap(MetadataV2V3SerializerBase.java:813)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateHandle(MetadataV2V3SerializerBase.java:344)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeKeyedStateCol(MetadataV2V3SerializerBase.java:269)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeSubtaskState(MetadataV2V3SerializerBase.java:262)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeSubtaskState(MetadataV3Serializer.java:142)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:122)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:146)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:100)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:87)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:82)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:333)
>  ~[flink-dist-1.16.0.jar:1.16.0]    at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1361)
>  ~[flink-dist-1.16.0.jar:1.16.0]    ... 7 more {code}
>  
> Prior to Flink 1.16, we did not observe this error. Since 
> `PlaceholderStreamStateHandle` is used to indicate it's a reusable RocksDB 
> data for incremental checkpoint, we believe that the new improvements of 
> incremental checkpoint introduced in flink 1.16 release might be related to 
> this issue.
> We require assistance in investigating this issue and finding a solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to