klion26 commented on a change in pull request #7351: [FLINK-11008][State
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250477821
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
##########
@@ -36,43 +36,47 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
/**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
*/
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+ public RocksDBStateDownloader(int restoringThreadNum) {
+ super(restoringThreadNum);
+ }
- static void transferAllStateDataToDirectory(
+ /**
+ * Transfer all state data to the target directory using specified
number of threads.
+ *
+ * @param restoreStateHandle Handles used to retrieve the state data.
+ * @param dest The target directory which the state data will be stored.
+ * @param closeableRegistry Which all the inputStream/outputStream will
be registered and unregistered.
+ *
+ * @throws Exception Thrown if can not transfer all the state data.
+ */
+ public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
- int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
final Map<StateHandleID, StreamStateHandle> sstFiles =
restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();
- downloadDataForAllStateHandles(sstFiles, dest,
restoringThreadNum, closeableRegistry);
- downloadDataForAllStateHandles(miscFiles, dest,
restoringThreadNum, closeableRegistry);
+ downloadDataForAllStateHandles(sstFiles, dest,
closeableRegistry);
Review comment:
@azagrebin We can not share `closeableRegistry` because of supportting
parallel snapshot. If we share the `closeableRegistry` when parallel snapshot,
the later complete snapshot will come into an Exception `IOException("Cannot
register Closeable, registry is already closed. Closing argument.")`(The
`closeableRegistry` was close in `AsyncSnapshotCallable#closeSnapshotIO`) when
registering the input/outputstream to the registry, such as the [Travis log
said](https://travis-ci.org/apache/flink/jobs/483728783).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services