klion26 commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250477821
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 ##########
 @@ -36,43 +36,47 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
 
 /**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
  */
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+       public RocksDBStateDownloader(int restoringThreadNum) {
+               super(restoringThreadNum);
+       }
 
-       static void transferAllStateDataToDirectory(
+       /**
+        * Transfer all state data to the target directory using specified 
number of threads.
+        *
+        * @param restoreStateHandle Handles used to retrieve the state data.
+        * @param dest The target directory which the state data will be stored.
+        * @param closeableRegistry Which all the inputStream/outputStream will 
be registered and unregistered.
+        *
+        * @throws Exception Thrown if can not transfer all the state data.
+        */
+       public void transferAllStateDataToDirectory(
                IncrementalKeyedStateHandle restoreStateHandle,
                Path dest,
-               int restoringThreadNum,
                CloseableRegistry closeableRegistry) throws Exception {
 
                final Map<StateHandleID, StreamStateHandle> sstFiles =
                        restoreStateHandle.getSharedState();
                final Map<StateHandleID, StreamStateHandle> miscFiles =
                        restoreStateHandle.getPrivateState();
 
-               downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
-               downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+               downloadDataForAllStateHandles(sstFiles, dest, 
closeableRegistry);
 
 Review comment:
   @azagrebin We can not share `closeableRegistry` because of supportting 
parallel snapshot. If we share the `closeableRegistry` when parallel snapshot, 
the later complete snapshot will come into an Exception `IOException("Cannot 
register Closeable, registry is already closed. Closing argument.")`(The 
`closeableRegistry` was close in `AsyncSnapshotCallable#closeSnapshotIO`) when 
registering the input/outputstream to the registry, such as the [Travis log 
said](https://travis-ci.org/apache/flink/jobs/483728783).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to