This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f0f0509fb0aaa99858abb35708a719d42501bd88 Author: Ritik Raj <[email protected]> AuthorDate: Mon Nov 10 15:35:33 2025 +0530 [NO ISSUE][CLOUD] Fix premature buffer release caused by flatMap cancel - user model changes: no - storage format changes: no - interface changes: no Details: Azure SDK uses Reactor/Netty for async downloads, where each download is a Mono merged with flatMap. When one Mono fails, flatMap cancels others, causing Azure to release buffers while Netty may still write into them, leading to IllegalReferenceCountException. Switching to flatMapDelayError defers error propagation and prevents premature cancellation, allowing all downloads to complete safely. Ext-ref: MB-69283 Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565 Tested-by: Ritik Raj <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../azure/blobstorage/AzureParallelDownloader.java | 12 ++- .../ReactiveExponentialRetryPolicy.java | 91 ++++++++++++++++++++++ .../hyracks/util/ExponentialRetryPolicy.java | 12 +++ 3 files changed, 112 insertions(+), 3 deletions(-) diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java index 12d8a192ca..57f044d450 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java @@ -35,6 +35,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.nc.io.IOManager; +import org.apache.hyracks.util.ExponentialRetryPolicy; import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.BlobContainerAsyncClient; @@ -43,12 +44,14 @@ import com.azure.storage.blob.models.ListBlobsOptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class AzureParallelDownloader extends AbstractParallelDownloader { private final IOManager ioManager; private final BlobContainerAsyncClient blobContainerAsyncClient; private final IRequestProfilerLimiter profiler; private final AzBlobStorageClientConfig config; + private final Retry retryPolicy; public AzureParallelDownloader(IOManager ioManager, BlobContainerAsyncClient blobContainerAsyncClient, IRequestProfilerLimiter profiler, AzBlobStorageClientConfig config) { @@ -56,6 +59,7 @@ public class AzureParallelDownloader extends AbstractParallelDownloader { this.blobContainerAsyncClient = blobContainerAsyncClient; this.profiler = profiler; this.config = config; + this.retryPolicy = ReactiveExponentialRetryPolicy.retryPolicy(new ExponentialRetryPolicy()); } @Override @@ -96,8 +100,9 @@ public class AzureParallelDownloader extends AbstractParallelDownloader { } private void waitForFileDownloads(List<Mono<Void>> downloads) throws HyracksDataException { - runBlockingWithExceptionHandling( - () -> Flux.fromIterable(downloads).flatMap(mono -> mono, downloads.size()).then().block()); + runBlockingWithExceptionHandling(() -> Flux.fromIterable(downloads) + .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), downloads.size(), downloads.size()).then() + .block()); } @Override @@ -111,8 +116,9 @@ public class AzureParallelDownloader extends AbstractParallelDownloader { directoryDownloads.add(directoryTask); } + int concurrency = config.getRequestsMaxPendingHttpConnections(); runBlockingWithExceptionHandling(() -> Flux.fromIterable(directoryDownloads) - .flatMap(mono -> mono, config.getRequestsMaxPendingHttpConnections()).then().block()); + .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), concurrency, concurrency).then().block()); return failedFiles; } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java new file mode 100644 index 0000000000..ba2e3d4499 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients.azure.blobstorage; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; +import org.apache.hyracks.util.ExponentialRetryPolicy; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import reactor.util.retry.Retry; + +/** + * Utility methods for building Reactor {@link Retry} policies that behave similarly to the + * blocking {@link ExponentialRetryPolicy}. + */ +public class ReactiveExponentialRetryPolicy { + + private static final Logger LOGGER = LogManager.getLogger(); + + private ReactiveExponentialRetryPolicy() { + } + + /** + * Creates a {@link Retry} instance based on an {@link ExponentialRetryPolicy}. + * + * @param policy the blocking policy to mirror. If {@code null}, defaults are used. + * @return a Reactor {@link Retry} behaving similarly to the provided policy. + */ + public static Retry retryPolicy(ExponentialRetryPolicy policy) { + ExponentialRetryPolicy effectivePolicy = policy != null ? policy + : new ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES, + CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES); + long initialDelay = Math.max(0L, effectivePolicy.getInitialDelay()); + long maxDelay = Math.max(0L, effectivePolicy.getMaxDelay()); + int maxRetries = Math.max(0, effectivePolicy.getMaxRetries()); + long maxAttempts = Math.max(1L, (long) maxRetries + 1L); + return Retry.backoff(maxAttempts, Duration.ofMillis(initialDelay)).maxBackoff(Duration.ofMillis(maxDelay)) + .filter(ReactiveExponentialRetryPolicy::isRetryable).doBeforeRetry(signal -> { + long retriesSoFar = signal.totalRetries(); + long delayMillis = computeDelayMillis(initialDelay, maxDelay, retriesSoFar); + long attempt = retriesSoFar + 1; + LOGGER.info("Retrying after {}ms, attempt {}/{}", delayMillis, attempt, maxRetries); + }).transientErrors(true); + } + + private static long computeDelayMillis(long initialDelay, long maxDelay, long retriesSoFar) { + if (initialDelay <= 0 || maxDelay <= 0) { + return 0L; + } + + long delay = initialDelay; + for (long i = 0; i < retriesSoFar; i++) { + delay = delay > maxDelay / 2 ? maxDelay : delay * 2; + } + + long jitteredDelay = ThreadLocalRandom.current().nextLong(1, delay + 1); + + return Math.min(jitteredDelay, maxDelay); + } + + private static boolean isRetryable(Throwable error) { + if (error instanceof IllegalArgumentException) { + return false; + } + if (ExceptionUtils.causedByInterrupt(error)) { + Thread.currentThread().interrupt(); + return false; + } + return true; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java index 8967e3e28c..385525ca08 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java @@ -101,6 +101,18 @@ public class ExponentialRetryPolicy implements IRetryPolicy { return false; } + public int getMaxRetries() { + return maxRetries; + } + + public long getInitialDelay() { + return delay; + } + + public long getMaxDelay() { + return maxDelay; + } + private static boolean isUnstable() { return Boolean.getBoolean(CLOUD_UNSTABLE_MODE); }
