[
https://issues.apache.org/jira/browse/FLINK-33932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805398#comment-17805398
]
xiangyu feng commented on FLINK-33932:
--------------------------------------
[~masteryhx] Thx for ur reply, I've talked to [~dianer17] to update the
description of this issue. Would you kindly assign this issue to me? Also, I
would like to hear more from you about this issue in the discussion thread of
FLIP-414: [https://lists.apache.org/thread/om4kgd6trx2lctwm6x92q2kdjngxtz9k]
> Support Retry Mechanism in RocksDBStateDataTransfer
> ---------------------------------------------------
>
> Key: FLINK-33932
> URL: https://issues.apache.org/jira/browse/FLINK-33932
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Reporter: Guojun Li
> Priority: Major
> Labels: pull-request-available
>
> Currently, there is no retry mechanism for downloading and uploading RocksDB
> state files. Any jittering of remote filesystem might lead to a checkpoint
> failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can
> significantly reduce the failure rate of checkpoint during asynchronous phase.
> The exception is as below:
> {noformat}
>
> 2023-12-19 08:46:00,197 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline
> checkpoint 2 by task
> 5b008c2c048fa8534d648455e69f9497_fbcce2f96483f8f15e87dc6c9afd372f_183_0 of
> job ffffffffa025f19e0000000000000000 at
> application-6f1c6e3d-1702480803995-5093022-taskmanager-1-1 @
> fdbd:dc61:1a:101:0:0:0:36 (dataPort=38789).
> org.apache.flink.util.SerializedThrowable:
> org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task
> checkpoint failed.
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception:
> Could not materialize checkpoint 2 for operator GlobalGroupAggregate[132] ->
> Calc[133] (184/500)#0.
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> ... 4 more
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.util.concurrent.ExecutionException: java.io.IOException: Could not flush
> to file and close the file system output stream to
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the
> stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
> at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
> at
> org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:544)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException:
> Could not flush to file and close the file system output stream to
> hdfs://xxx/shared/97329cbd-fb90-45ca-92de-edf52d6e6fc0 in order to obtain the
> stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:516)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:157)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:113)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:32)
> ~[flink-dist-1.17-xxx-SNAPSHOT.jar:1.17-xxx-SNAPSHOT]
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ~[?:?]
> ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable:
> java.net.ConnectException: Connection timed out
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
> ~[?:?]
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
> ~[hadoop-common-2.6.0-cdh5.4.4.jar:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1835)
> ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1268)
> ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1257)
> ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1414)
> ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1149)
> ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652)
> ~[hadoop-hdfs-2.6.0-cdh5.4.4.jar:?]
> {noformat}
> We can support retry mechanism for rocksdb uploader to decrease the failure
> rate of checkpointing in the async phase.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)