pnowojski commented on code in PR #23986:
URL: https://github.com/apache/flink/pull/23986#discussion_r1441986930
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -107,13 +122,43 @@ private List<CompletableFuture<HandleAndLocalPath>>
createUploadFutures(
e ->
CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(
- () ->
-
uploadLocalFileToCheckpointFs(
- e,
-
checkpointStreamFactory,
- stateScope,
-
closeableRegistry,
-
tmpResourcesRegistry)),
+ () -> {
Review Comment:
Please extract to separate method.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -28,31 +28,46 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.concurrent.FixedRetryStrategy;
import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.concurrent.RetryStrategy;
import org.apache.flink.util.function.CheckedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** Help class for uploading RocksDB state files. */
public class RocksDBStateUploader extends RocksDBStateDataTransfer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RocksDBStateUploader.class);
+
private static final int READ_BUFFER_SIZE = 16 * 1024;
+ private static final int DEFAULT_RETRY_TIMES = 3;
+
+ private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(1L);
Review Comment:
This should be configurable and most likely in the first release default to
the current behaviour.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -107,13 +122,43 @@ private List<CompletableFuture<HandleAndLocalPath>>
createUploadFutures(
e ->
CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(
- () ->
-
uploadLocalFileToCheckpointFs(
- e,
-
checkpointStreamFactory,
- stateScope,
-
closeableRegistry,
-
tmpResourcesRegistry)),
+ () -> {
+ RetryStrategy
retryStrategy =
+ new
FixedRetryStrategy(
+
DEFAULT_RETRY_TIMES,
+
DEFAULT_RETRY_DELAY);
+ while (true) {
+ try {
+ return
uploadLocalFileToCheckpointFs(
+ e,
+
checkpointStreamFactory,
+ stateScope,
+
closeableRegistry,
+
tmpResourcesRegistry);
+ } catch (Throwable t) {
+ if (retryStrategy
+
.getNumRemainingRetries()
+ == 0) {
+ throw t;
+ }
+
+ LOG.warn(
+ "Fail to
upload file to HDFS, will retry {} times, left retry times {}.",
+
DEFAULT_RETRY_TIMES,
+
retryStrategy
+
.getNumRemainingRetries(),
+ t);
+ }
+
+
TimeUnit.MILLISECONDS.sleep(
+ retryStrategy
+
.getRetryDelay()
+
.toMillis());
Review Comment:
It would be better to not synchronously wait on the timeout, and free up the
thread pool to do other things.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]