azagrebin commented on a change in pull request #7351: [FLINK-11008][State 
Backends, Checkpointing]SpeedUp upload state files using multithread
URL: https://github.com/apache/flink/pull/7351#discussion_r248348274
 
 

 ##########
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##########
 @@ -61,6 +80,116 @@ static void transferAllStateDataToDirectory(
                downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
        }
 
+       /**
+        * Upload all the files to checkpoint fileSystem using specified number 
of threads.
+        *
+        * @param files The files will be uploaded to checkpoint filesystem.
+        * @param numberOfSnapshottingThreads The number of threads used to 
upload the files.
+        * @param checkpointStreamFactory The checkpoint streamFactory used to 
create outputstream.
+        * @param closeableRegistry
+        *
+        * @throws Exception Thrown if can not upload all the files.
+        */
+       public static Map<StateHandleID, StreamStateHandle> 
uploadFilesToCheckpointFs(
+               @Nonnull Map<StateHandleID, Path> files,
+               int numberOfSnapshottingThreads,
+               CheckpointStreamFactory checkpointStreamFactory,
+               CloseableRegistry closeableRegistry) throws Exception {
+
+               Map<StateHandleID, StreamStateHandle> handles = new HashMap<>();
+
+               ExecutorService executorService = 
createExecutorService(numberOfSnapshottingThreads);
 
 Review comment:
   @klion26 
   The upload and download part are quite independent. The base class could 
contain the executor, number of threads, closable registry and close method (to 
shutdown the executor instead of shutdowning it every time). The  upload and 
download classes can extend it and share the executor. The downloader could be 
then registered with the closable registry. For upload, we can of course 
shutdown it immediately after restore is done. I agree that it would make sense 
to keep threads up and for code reuse as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to