This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 848c4bc3657ca559222597bcc614cba8bfd1c3ca Author: Ritik Raj <[email protected]> AuthorDate: Fri Nov 7 22:08:24 2025 +0530 [ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel downloader - user model changes: no - storage format changes: no - interface changes: no Details: Added retry with ExponentialRetryPolicy() for downloadDirectories in ParallelDownloader. Ext-ref: MB-69226 Change-Id: Ica47227218a323ededced75691d3f64070c97729 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Ritik Raj <[email protected]> --- .../asterix/cloud/AbstractCloudIOManager.java | 4 +- .../apache/asterix/cloud/EagerCloudIOManager.java | 2 +- .../apache/asterix/cloud/LazyCloudIOManager.java | 2 +- .../cloud/clients/AbstractParallelDownloader.java | 84 ++++++++++++++++++++++ .../asterix/cloud/clients/IParallelDownloader.java | 2 +- .../cloud/clients/aws/s3/S3ParallelDownloader.java | 17 ++--- .../cloud/clients/aws/s3/S3SyncDownloader.java | 14 ++-- .../azure/blobstorage/AzureParallelDownloader.java | 9 ++- .../clients/google/gcs/GCSParallelDownloader.java | 7 +- .../cloud/util/CloudRetryableRequestUtil.java | 4 +- 10 files changed, 108 insertions(+), 37 deletions(-) diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 6fc271e382..23df3499d0 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -192,7 +192,7 @@ public abstract class AbstractCloudIOManager extends IOManager implements IParti public void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException { try (IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager)) { LOGGER.info("Downloading all files located in {}", libPath); - downloader.downloadDirectories(libPath); + downloader.downloadDirectoriesWithRetry(libPath); LOGGER.info("Finished downloading {}", libPath); } } @@ -202,7 +202,7 @@ public abstract class AbstractCloudIOManager extends IOManager implements IParti FileReference appDir = resolveAbsolutePath( localIoManager.getWorkspacePath(0).getPath() + File.separator + APPLICATION_ROOT_DIR_NAME); LOGGER.info("Downloading all libraries in + {}", appDir); - downloader.downloadDirectories(Collections.singletonList(appDir)); + downloader.downloadDirectoriesWithRetry(Collections.singletonList(appDir)); LOGGER.info("Finished downloading all libraries"); } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java index 1cb6077c70..59e21ac14c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java @@ -62,7 +62,7 @@ final class EagerCloudIOManager extends AbstractCloudIOManager { protected void downloadPartitions(boolean metadataNode, int metadataPartition) throws HyracksDataException { IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager); LOGGER.info("Downloading all files located in {}", partitionPaths); - downloader.downloadDirectories(partitionPaths); + downloader.downloadDirectoriesWithRetry(partitionPaths); LOGGER.info("Finished downloading {}", partitionPaths); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java index 1c5efd9c32..999a6b6698 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java @@ -156,7 +156,7 @@ final class LazyCloudIOManager extends AbstractCloudIOManager { LOGGER.info("Downloading metadata partition {}, Current uncached files: {}", metadataPartition, uncachedFiles); FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME + File.separator + partitionDir); - downloader.downloadDirectories(Collections.singleton(metadataDir)); + downloader.downloadDirectoriesWithRetry(Collections.singleton(metadataDir)); uncachedFiles.removeIf(f -> f.getRelativePath().contains(partitionDir)); LOGGER.info("Finished downloading metadata partition. Current uncached files: {}", uncachedFiles); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java new file mode 100644 index 0000000000..29a50a6d14 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; +import org.apache.hyracks.util.ExponentialRetryPolicy; +import org.apache.hyracks.util.IRetryPolicy; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class AbstractParallelDownloader implements IParallelDownloader { + private static final Logger LOGGER = LogManager.getLogger(); + + public void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) throws HyracksDataException { + Set<FileReference> failedFiles = new HashSet<>(toDownload); + IRetryPolicy retryPolicy = new ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES, + CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES); + int attempt = 1; + while (true) { + try { + failedFiles = downloadDirectories(toDownload); + + if (failedFiles.isEmpty()) { + return; + } + + if (!retryPolicy.retry(null)) { + LOGGER.error("Exhausted retries ({}) — failed to download {} directories: {}", + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, failedFiles.size(), failedFiles); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION); + } + + LOGGER.warn("Failed to download directories (attempt {}/{}), retrying. Remaining: {}", attempt, + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, failedFiles.size()); + } catch (IOException | ExecutionException | InterruptedException e) { + if (ExceptionUtils.causedByInterrupt(e) && !Thread.currentThread().isInterrupted()) { + LOGGER.warn("Lost suppressed interrupt during downloadDirectory retry", e); + throw HyracksDataException.create(e); + } + try { + if (!retryPolicy.retry(e)) { + LOGGER.error("Exhausted retries ({}) — failed to download {} directories: {}", + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, failedFiles.size(), failedFiles); + throw HyracksDataException.create(e); + } + } catch (InterruptedException e1) { + throw HyracksDataException.create(e1); + } + LOGGER.warn("Failed to downloadDirectories, performing {}/{}", attempt, + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, e); + } + attempt++; + } + } + + protected abstract Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws ExecutionException, InterruptedException, IOException; +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java index 6f1c4536e1..5f7ff0bf4a 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java @@ -39,7 +39,7 @@ public interface IParallelDownloader extends AutoCloseable { * @param toDownload all files to be downloaded * @return file that failed to download */ - Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) throws HyracksDataException; + void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) throws HyracksDataException; /** * Close the downloader and release all of its resources diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index 0321a35329..bfb52c9f70 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -30,7 +30,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -57,7 +57,7 @@ import software.amazon.awssdk.transfer.s3.model.FileDownload; import software.amazon.awssdk.utils.AttributeMap; @ThreadSafe -class S3ParallelDownloader implements IParallelDownloader { +class S3ParallelDownloader extends AbstractParallelDownloader { private final String bucket; private final IOManager ioManager; private final S3AsyncClient s3AsyncClient; @@ -84,17 +84,10 @@ class S3ParallelDownloader implements IParallelDownloader { } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) - throws HyracksDataException { - Set<FileReference> failedFiles; + public Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws HyracksDataException, ExecutionException, InterruptedException { List<CompletableFuture<CompletedDirectoryDownload>> downloads = startDownloadingDirectories(toDownload); - try { - failedFiles = waitForDirectoryDownloads(downloads); - } catch (ExecutionException | InterruptedException e) { - throw HyracksDataException.create(e); - } - - return failedFiles; + return waitForDirectoryDownloads(downloads); } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java index 53e3dadbc2..498c34b5dd 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java @@ -33,7 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -50,7 +50,7 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.S3Object; @ThreadSafe -public class S3SyncDownloader implements IParallelDownloader { +public class S3SyncDownloader extends AbstractParallelDownloader { private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; @@ -126,14 +126,10 @@ public class S3SyncDownloader implements IParallelDownloader { } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) - throws HyracksDataException { + public Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws IOException, ExecutionException, InterruptedException { Set<FileReference> failedFiles; - try { - failedFiles = downloadDirectoriesAndWait(toDownload); - } catch (IOException | InterruptedException | ExecutionException e) { - throw HyracksDataException.create(e); - } + failedFiles = downloadDirectoriesAndWait(toDownload); return failedFiles; } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java index 364fb2a313..12d8a192ca 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java @@ -29,7 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -44,7 +44,7 @@ import com.azure.storage.blob.models.ListBlobsOptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class AzureParallelDownloader implements IParallelDownloader { +public class AzureParallelDownloader extends AbstractParallelDownloader { private final IOManager ioManager; private final BlobContainerAsyncClient blobContainerAsyncClient; private final IRequestProfilerLimiter profiler; @@ -101,8 +101,7 @@ public class AzureParallelDownloader implements IParallelDownloader { } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> directories) - throws HyracksDataException { + public Set<FileReference> downloadDirectories(Collection<FileReference> directories) throws HyracksDataException { Set<FileReference> failedFiles = new HashSet<>(); List<Mono<Void>> directoryDownloads = new ArrayList<>(); @@ -196,4 +195,4 @@ public class AzureParallelDownloader implements IParallelDownloader { throw HyracksDataException.create(e); } } -} \ No newline at end of file +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java index b9e7eeea5c..574a9b6ebb 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -51,7 +51,7 @@ import com.google.cloud.storage.transfermanager.TransferManager; import com.google.cloud.storage.transfermanager.TransferManagerConfig; import com.google.cloud.storage.transfermanager.TransferStatus; -public class GCSParallelDownloader implements IParallelDownloader { +public class GCSParallelDownloader extends AbstractParallelDownloader { private final String bucket; private final IOManager ioManager; @@ -102,8 +102,7 @@ public class GCSParallelDownloader implements IParallelDownloader { } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) - throws HyracksDataException { + public Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) { Set<FileReference> failedFiles = new HashSet<>(); ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix()); diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java index e01190035e..5461ed5bd4 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java @@ -53,8 +53,8 @@ public class CloudRetryableRequestUtil { private static final int UNSTABLE_NUMBER_OF_RETRIES = 100; private static final int UNSTABLE_MAX_DELAY_BETWEEN_RETRIES_IN_MILLIS = 0; private static final Logger LOGGER = LogManager.getLogger(); - private static final int NUMBER_OF_RETRIES = getNumberOfRetries(); - private static final long MAX_DELAY_BETWEEN_RETRIES = getMaxDelayBetweenRetries(); + public static final int NUMBER_OF_RETRIES = getNumberOfRetries(); + public static final long MAX_DELAY_BETWEEN_RETRIES = getMaxDelayBetweenRetries(); private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e -> true; private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> {
