This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 848c4bc3657ca559222597bcc614cba8bfd1c3ca
Author: Ritik Raj <[email protected]>
AuthorDate: Fri Nov 7 22:08:24 2025 +0530

    [ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel downloader
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Added retry with ExponentialRetryPolicy() for
    downloadDirectories in ParallelDownloader.
    
    Ext-ref: MB-69226
    Change-Id: Ica47227218a323ededced75691d3f64070c97729
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556
    Reviewed-by: Murtadha Hubail <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Ritik Raj <[email protected]>
---
 .../asterix/cloud/AbstractCloudIOManager.java      |  4 +-
 .../apache/asterix/cloud/EagerCloudIOManager.java  |  2 +-
 .../apache/asterix/cloud/LazyCloudIOManager.java   |  2 +-
 .../cloud/clients/AbstractParallelDownloader.java  | 84 ++++++++++++++++++++++
 .../asterix/cloud/clients/IParallelDownloader.java |  2 +-
 .../cloud/clients/aws/s3/S3ParallelDownloader.java | 17 ++---
 .../cloud/clients/aws/s3/S3SyncDownloader.java     | 14 ++--
 .../azure/blobstorage/AzureParallelDownloader.java |  9 ++-
 .../clients/google/gcs/GCSParallelDownloader.java  |  7 +-
 .../cloud/util/CloudRetryableRequestUtil.java      |  4 +-
 10 files changed, 108 insertions(+), 37 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 6fc271e382..23df3499d0 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -192,7 +192,7 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     public void downloadLibrary(Collection<FileReference> libPath) throws 
HyracksDataException {
         try (IParallelDownloader downloader = 
cloudClient.createParallelDownloader(bucket, localIoManager)) {
             LOGGER.info("Downloading all files located in {}", libPath);
-            downloader.downloadDirectories(libPath);
+            downloader.downloadDirectoriesWithRetry(libPath);
             LOGGER.info("Finished downloading {}", libPath);
         }
     }
@@ -202,7 +202,7 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
             FileReference appDir = resolveAbsolutePath(
                     localIoManager.getWorkspacePath(0).getPath() + 
File.separator + APPLICATION_ROOT_DIR_NAME);
             LOGGER.info("Downloading all libraries in + {}", appDir);
-            downloader.downloadDirectories(Collections.singletonList(appDir));
+            
downloader.downloadDirectoriesWithRetry(Collections.singletonList(appDir));
             LOGGER.info("Finished downloading all libraries");
         }
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077c70..59e21ac14c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -62,7 +62,7 @@ final class EagerCloudIOManager extends 
AbstractCloudIOManager {
     protected void downloadPartitions(boolean metadataNode, int 
metadataPartition) throws HyracksDataException {
         IParallelDownloader downloader = 
cloudClient.createParallelDownloader(bucket, localIoManager);
         LOGGER.info("Downloading all files located in {}", partitionPaths);
-        downloader.downloadDirectories(partitionPaths);
+        downloader.downloadDirectoriesWithRetry(partitionPaths);
         LOGGER.info("Finished downloading {}", partitionPaths);
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9c32..999a6b6698 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -156,7 +156,7 @@ final class LazyCloudIOManager extends 
AbstractCloudIOManager {
             LOGGER.info("Downloading metadata partition {}, Current uncached 
files: {}", metadataPartition,
                     uncachedFiles);
             FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME + 
File.separator + partitionDir);
-            downloader.downloadDirectories(Collections.singleton(metadataDir));
+            
downloader.downloadDirectoriesWithRetry(Collections.singleton(metadataDir));
             uncachedFiles.removeIf(f -> 
f.getRelativePath().contains(partitionDir));
             LOGGER.info("Finished downloading metadata partition. Current 
uncached files: {}", uncachedFiles);
         }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
new file mode 100644
index 0000000000..29a50a6d14
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.hyracks.util.IRetryPolicy;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractParallelDownloader implements 
IParallelDownloader {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public void downloadDirectoriesWithRetry(Collection<FileReference> 
toDownload) throws HyracksDataException {
+        Set<FileReference> failedFiles = new HashSet<>(toDownload);
+        IRetryPolicy retryPolicy = new 
ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
+                CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES);
+        int attempt = 1;
+        while (true) {
+            try {
+                failedFiles = downloadDirectories(toDownload);
+
+                if (failedFiles.isEmpty()) {
+                    return;
+                }
+
+                if (!retryPolicy.retry(null)) {
+                    LOGGER.error("Exhausted retries ({}) — failed to download 
{} directories: {}",
+                            CloudRetryableRequestUtil.NUMBER_OF_RETRIES, 
failedFiles.size(), failedFiles);
+                    throw 
HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION);
+                }
+
+                LOGGER.warn("Failed to download directories (attempt {}/{}), 
retrying. Remaining: {}", attempt,
+                        CloudRetryableRequestUtil.NUMBER_OF_RETRIES, 
failedFiles.size());
+            } catch (IOException | ExecutionException | InterruptedException 
e) {
+                if (ExceptionUtils.causedByInterrupt(e) && 
!Thread.currentThread().isInterrupted()) {
+                    LOGGER.warn("Lost suppressed interrupt during 
downloadDirectory retry", e);
+                    throw HyracksDataException.create(e);
+                }
+                try {
+                    if (!retryPolicy.retry(e)) {
+                        LOGGER.error("Exhausted retries ({}) — failed to 
download {} directories: {}",
+                                CloudRetryableRequestUtil.NUMBER_OF_RETRIES, 
failedFiles.size(), failedFiles);
+                        throw HyracksDataException.create(e);
+                    }
+                } catch (InterruptedException e1) {
+                    throw HyracksDataException.create(e1);
+                }
+                LOGGER.warn("Failed to downloadDirectories, performing {}/{}", 
attempt,
+                        CloudRetryableRequestUtil.NUMBER_OF_RETRIES, e);
+            }
+            attempt++;
+        }
+    }
+
+    protected abstract Set<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
+            throws ExecutionException, InterruptedException, IOException;
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
index 6f1c4536e1..5f7ff0bf4a 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java
@@ -39,7 +39,7 @@ public interface IParallelDownloader extends AutoCloseable {
      * @param toDownload all files to be downloaded
      * @return file that failed to download
      */
-    Collection<FileReference> downloadDirectories(Collection<FileReference> 
toDownload) throws HyracksDataException;
+    void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) 
throws HyracksDataException;
 
     /**
      * Close the downloader and release all of its resources
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 0321a35329..bfb52c9f70 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -30,7 +30,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -57,7 +57,7 @@ import software.amazon.awssdk.transfer.s3.model.FileDownload;
 import software.amazon.awssdk.utils.AttributeMap;
 
 @ThreadSafe
-class S3ParallelDownloader implements IParallelDownloader {
+class S3ParallelDownloader extends AbstractParallelDownloader {
     private final String bucket;
     private final IOManager ioManager;
     private final S3AsyncClient s3AsyncClient;
@@ -84,17 +84,10 @@ class S3ParallelDownloader implements IParallelDownloader {
     }
 
     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
-            throws HyracksDataException {
-        Set<FileReference> failedFiles;
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
toDownload)
+            throws HyracksDataException, ExecutionException, 
InterruptedException {
         List<CompletableFuture<CompletedDirectoryDownload>> downloads = 
startDownloadingDirectories(toDownload);
-        try {
-            failedFiles = waitForDirectoryDownloads(downloads);
-        } catch (ExecutionException | InterruptedException e) {
-            throw HyracksDataException.create(e);
-        }
-
-        return failedFiles;
+        return waitForDirectoryDownloads(downloads);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
index 53e3dadbc2..498c34b5dd 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -50,7 +50,7 @@ import 
software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Object;
 
 @ThreadSafe
-public class S3SyncDownloader implements IParallelDownloader {
+public class S3SyncDownloader extends AbstractParallelDownloader {
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final String bucket;
@@ -126,14 +126,10 @@ public class S3SyncDownloader implements 
IParallelDownloader {
     }
 
     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
-            throws HyracksDataException {
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
toDownload)
+            throws IOException, ExecutionException, InterruptedException {
         Set<FileReference> failedFiles;
-        try {
-            failedFiles = downloadDirectoriesAndWait(toDownload);
-        } catch (IOException | InterruptedException | ExecutionException e) {
-            throw HyracksDataException.create(e);
-        }
+        failedFiles = downloadDirectoriesAndWait(toDownload);
         return failedFiles;
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 364fb2a313..12d8a192ca 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -29,7 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -44,7 +44,7 @@ import com.azure.storage.blob.models.ListBlobsOptions;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-public class AzureParallelDownloader implements IParallelDownloader {
+public class AzureParallelDownloader extends AbstractParallelDownloader {
     private final IOManager ioManager;
     private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
@@ -101,8 +101,7 @@ public class AzureParallelDownloader implements 
IParallelDownloader {
     }
 
     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> directories)
-            throws HyracksDataException {
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
directories) throws HyracksDataException {
 
         Set<FileReference> failedFiles = new HashSet<>();
         List<Mono<Void>> directoryDownloads = new ArrayList<>();
@@ -196,4 +195,4 @@ public class AzureParallelDownloader implements 
IParallelDownloader {
             throw HyracksDataException.create(e);
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index b9e7eeea5c..574a9b6ebb 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.AbstractParallelDownloader;
 import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -51,7 +51,7 @@ import 
com.google.cloud.storage.transfermanager.TransferManager;
 import com.google.cloud.storage.transfermanager.TransferManagerConfig;
 import com.google.cloud.storage.transfermanager.TransferStatus;
 
-public class GCSParallelDownloader implements IParallelDownloader {
+public class GCSParallelDownloader extends AbstractParallelDownloader {
 
     private final String bucket;
     private final IOManager ioManager;
@@ -102,8 +102,7 @@ public class GCSParallelDownloader implements 
IParallelDownloader {
     }
 
     @Override
-    public Collection<FileReference> 
downloadDirectories(Collection<FileReference> toDownload)
-            throws HyracksDataException {
+    public Set<FileReference> downloadDirectories(Collection<FileReference> 
toDownload) {
         Set<FileReference> failedFiles = new HashSet<>();
         ParallelDownloadConfig.Builder config =
                 
ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
index e01190035e..5461ed5bd4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -53,8 +53,8 @@ public class CloudRetryableRequestUtil {
     private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
     private static final int UNSTABLE_MAX_DELAY_BETWEEN_RETRIES_IN_MILLIS = 0;
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
-    private static final long MAX_DELAY_BETWEEN_RETRIES = 
getMaxDelayBetweenRetries();
+    public static final int NUMBER_OF_RETRIES = getNumberOfRetries();
+    public static final long MAX_DELAY_BETWEEN_RETRIES = 
getMaxDelayBetweenRetries();
 
     private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e -> 
true;
     private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> {

Reply via email to