azagrebin commented on a change in pull request #7351: [FLINK-11008][State
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r250625564
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
##########
@@ -36,43 +36,47 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static
org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
/**
- * Data transfer utils for {@link RocksDBKeyedStateBackend}.
+ * Help class for downloading RocksDBState.
*/
-class RocksDbStateDataTransfer {
+public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
+ public RocksDBStateDownloader(int restoringThreadNum) {
+ super(restoringThreadNum);
+ }
- static void transferAllStateDataToDirectory(
+ /**
+ * Transfer all state data to the target directory using specified
number of threads.
+ *
+ * @param restoreStateHandle Handles used to retrieve the state data.
+ * @param dest The target directory which the state data will be stored.
+ * @param closeableRegistry Which all the inputStream/outputStream will
be registered and unregistered.
+ *
+ * @throws Exception Thrown if can not transfer all the state data.
+ */
+ public void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest,
- int restoringThreadNum,
CloseableRegistry closeableRegistry) throws Exception {
final Map<StateHandleID, StreamStateHandle> sstFiles =
restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();
- downloadDataForAllStateHandles(sstFiles, dest,
restoringThreadNum, closeableRegistry);
- downloadDataForAllStateHandles(miscFiles, dest,
restoringThreadNum, closeableRegistry);
+ downloadDataForAllStateHandles(sstFiles, dest,
closeableRegistry);
Review comment:
ok, I see the problem, thanks for pointing out
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services