pnowojski commented on code in PR #23986:
URL: https://github.com/apache/flink/pull/23986#discussion_r1441986930


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -107,13 +122,43 @@ private List<CompletableFuture<HandleAndLocalPath>> 
createUploadFutures(
                         e ->
                                 CompletableFuture.supplyAsync(
                                         CheckedSupplier.unchecked(
-                                                () ->
-                                                        
uploadLocalFileToCheckpointFs(
-                                                                e,
-                                                                
checkpointStreamFactory,
-                                                                stateScope,
-                                                                
closeableRegistry,
-                                                                
tmpResourcesRegistry)),
+                                                () -> {

Review Comment:
   Please extract to separate method.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -28,31 +28,46 @@
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.concurrent.FixedRetryStrategy;
 import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
 import org.apache.flink.util.function.CheckedSupplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** Help class for uploading RocksDB state files. */
 public class RocksDBStateUploader extends RocksDBStateDataTransfer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateUploader.class);
+
     private static final int READ_BUFFER_SIZE = 16 * 1024;
 
+    private static final int DEFAULT_RETRY_TIMES = 3;
+
+    private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(1L);

Review Comment:
   This should be configurable and most likely in the first release default to 
the current behaviour.



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -107,13 +122,43 @@ private List<CompletableFuture<HandleAndLocalPath>> 
createUploadFutures(
                         e ->
                                 CompletableFuture.supplyAsync(
                                         CheckedSupplier.unchecked(
-                                                () ->
-                                                        
uploadLocalFileToCheckpointFs(
-                                                                e,
-                                                                
checkpointStreamFactory,
-                                                                stateScope,
-                                                                
closeableRegistry,
-                                                                
tmpResourcesRegistry)),
+                                                () -> {
+                                                    RetryStrategy 
retryStrategy =
+                                                            new 
FixedRetryStrategy(
+                                                                    
DEFAULT_RETRY_TIMES,
+                                                                    
DEFAULT_RETRY_DELAY);
+                                                    while (true) {
+                                                        try {
+                                                            return 
uploadLocalFileToCheckpointFs(
+                                                                    e,
+                                                                    
checkpointStreamFactory,
+                                                                    stateScope,
+                                                                    
closeableRegistry,
+                                                                    
tmpResourcesRegistry);
+                                                        } catch (Throwable t) {
+                                                            if (retryStrategy
+                                                                            
.getNumRemainingRetries()
+                                                                    == 0) {
+                                                                throw t;
+                                                            }
+
+                                                            LOG.warn(
+                                                                    "Fail to 
upload file to HDFS, will retry {} times, left retry times {}.",
+                                                                    
DEFAULT_RETRY_TIMES,
+                                                                    
retryStrategy
+                                                                            
.getNumRemainingRetries(),
+                                                                    t);
+                                                        }
+
+                                                        
TimeUnit.MILLISECONDS.sleep(
+                                                                retryStrategy
+                                                                        
.getRetryDelay()
+                                                                        
.toMillis());

Review Comment:
   It would be better to not synchronously wait on the timeout, and free up the 
thread pool to do other things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to