Hi Devs,
I am very new to the Flink code base and working on the evaluation of the
Checkpointing strategy
In my current setup I am using an NFS based file system as a checkpoint
store. (NAS/NFS has a very high TP over 2GB/s on one node and I am using 12
NFS servers )
When pushing the system to some relatively medium scale aka 120 subtasks
over 6 works with a total state of 100GB.
I observe that the Job manager takes over 2 minutes to finalize the
checkpoint. (observed on the UI and CPU profiling of JM see the flame graph
of 30 second sample)
As you can see by the attached Flames graphs the JM is very busy
serializing the metadata
(>org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.serializeOperatorState
(2,875 samples, 99.65%))
Now the question is why this metadata file is so big in the order of 3GBs
in my case.
How does this size scale ? num_of_tasks * num_states ?
/opt/flink/pv/checkpoints/c0580ec8f55fcf1e0ceaa46fc3778b99/chk-1
bash-4.2$ ls -all -h
-rw-r--r-- 1 flink flink 3.0G Nov 17 01:42 _metadata
The second question how to better measure the time taken by the JM to
commit the transaction aka time_done_checkpoint - time_got_all_ask_form_tm
Is there a config flag I am missing to make this last step faster ?
My current configs for Checkpoints
state.backend: rocksdb
# See the PV mount path need to be the same as <mountPath:
"/opt/flink/pv">
state.checkpoints.dir: file:///opt/flink/pv/checkpoints
state.savepoints.dir: file:///opt/flink/pv/savepoints
state.backend.incremental: true
#
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing
execution.checkpointing.interval: 60000
execution.checkpointing.mode: AT_LEAST_ONCE
# hitting The rpc invocation size 19598830 exceeds the maximum akka
akka.framesize: 100485760b
#
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#heartbeat-timeout
heartbeat.timeout: 70000
#
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#execution-checkpointing-timeout
execution.checkpointing.timeout: 15minutes
some metadata about the checkpoint
{"@class":"completed","id":1,"status":"COMPLETED","is_savepoint":false,"trigger_timestamp":1605315046120,"latest_ack_timestamp":1605315093466,"state_size":12239786229,"end_to_end_duration":47346,"alignment_buffered":0,"num_subtasks":120,"num_acknowledged_subtasks":120,"tasks":{},"external_path":"file:/opt/flink/pv/checkpoints/7474752476036c14d7fdeb4e86af3638/chk-1"}