This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit f0f0509fb0aaa99858abb35708a719d42501bd88
Author: Ritik Raj <[email protected]>
AuthorDate: Mon Nov 10 15:35:33 2025 +0530

    [NO ISSUE][CLOUD] Fix premature buffer release caused by flatMap cancel
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Azure SDK uses Reactor/Netty for async downloads, where each download
    is a Mono merged with flatMap. When one Mono fails, flatMap cancels
    others, causing Azure to release buffers while Netty may still write
    into them, leading to IllegalReferenceCountException. Switching to
    flatMapDelayError defers error propagation and prevents premature
    cancellation, allowing all downloads to complete safely.
    
    Ext-ref: MB-69283
    Change-Id: If0c3577225a0dffffadfab13321a9c8702c3551e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20565
    Tested-by: Ritik Raj <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../azure/blobstorage/AzureParallelDownloader.java | 12 ++-
 .../ReactiveExponentialRetryPolicy.java            | 91 ++++++++++++++++++++++
 .../hyracks/util/ExponentialRetryPolicy.java       | 12 +++
 3 files changed, 112 insertions(+), 3 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
index 12d8a192ca..57f044d450 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -35,6 +35,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
 
 import com.azure.storage.blob.BlobAsyncClient;
 import com.azure.storage.blob.BlobContainerAsyncClient;
@@ -43,12 +44,14 @@ import com.azure.storage.blob.models.ListBlobsOptions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class AzureParallelDownloader extends AbstractParallelDownloader {
     private final IOManager ioManager;
     private final BlobContainerAsyncClient blobContainerAsyncClient;
     private final IRequestProfilerLimiter profiler;
     private final AzBlobStorageClientConfig config;
+    private final Retry retryPolicy;
 
     public AzureParallelDownloader(IOManager ioManager, 
BlobContainerAsyncClient blobContainerAsyncClient,
             IRequestProfilerLimiter profiler, AzBlobStorageClientConfig 
config) {
@@ -56,6 +59,7 @@ public class AzureParallelDownloader extends 
AbstractParallelDownloader {
         this.blobContainerAsyncClient = blobContainerAsyncClient;
         this.profiler = profiler;
         this.config = config;
+        this.retryPolicy = ReactiveExponentialRetryPolicy.retryPolicy(new 
ExponentialRetryPolicy());
     }
 
     @Override
@@ -96,8 +100,9 @@ public class AzureParallelDownloader extends 
AbstractParallelDownloader {
     }
 
     private void waitForFileDownloads(List<Mono<Void>> downloads) throws 
HyracksDataException {
-        runBlockingWithExceptionHandling(
-                () -> Flux.fromIterable(downloads).flatMap(mono -> mono, 
downloads.size()).then().block());
+        runBlockingWithExceptionHandling(() -> Flux.fromIterable(downloads)
+                .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), 
downloads.size(), downloads.size()).then()
+                .block());
     }
 
     @Override
@@ -111,8 +116,9 @@ public class AzureParallelDownloader extends 
AbstractParallelDownloader {
             directoryDownloads.add(directoryTask);
         }
 
+        int concurrency = config.getRequestsMaxPendingHttpConnections();
         runBlockingWithExceptionHandling(() -> 
Flux.fromIterable(directoryDownloads)
-                .flatMap(mono -> mono, 
config.getRequestsMaxPendingHttpConnections()).then().block());
+                .flatMapDelayError(mono -> mono.retryWhen(retryPolicy), 
concurrency, concurrency).then().block());
 
         return failedFiles;
     }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
new file mode 100644
index 0000000000..ba2e3d4499
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/ReactiveExponentialRetryPolicy.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.hyracks.util.ExponentialRetryPolicy;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import reactor.util.retry.Retry;
+
+/**
+ * Utility methods for building Reactor {@link Retry} policies that behave 
similarly to the
+ * blocking {@link ExponentialRetryPolicy}.
+ */
+public class ReactiveExponentialRetryPolicy {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private ReactiveExponentialRetryPolicy() {
+    }
+
+    /**
+     * Creates a {@link Retry} instance based on an {@link 
ExponentialRetryPolicy}.
+     *
+     * @param policy the blocking policy to mirror. If {@code null}, defaults 
are used.
+     * @return a Reactor {@link Retry} behaving similarly to the provided 
policy.
+     */
+    public static Retry retryPolicy(ExponentialRetryPolicy policy) {
+        ExponentialRetryPolicy effectivePolicy = policy != null ? policy
+                : new 
ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES,
+                        CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES);
+        long initialDelay = Math.max(0L, effectivePolicy.getInitialDelay());
+        long maxDelay = Math.max(0L, effectivePolicy.getMaxDelay());
+        int maxRetries = Math.max(0, effectivePolicy.getMaxRetries());
+        long maxAttempts = Math.max(1L, (long) maxRetries + 1L);
+        return Retry.backoff(maxAttempts, 
Duration.ofMillis(initialDelay)).maxBackoff(Duration.ofMillis(maxDelay))
+                
.filter(ReactiveExponentialRetryPolicy::isRetryable).doBeforeRetry(signal -> {
+                    long retriesSoFar = signal.totalRetries();
+                    long delayMillis = computeDelayMillis(initialDelay, 
maxDelay, retriesSoFar);
+                    long attempt = retriesSoFar + 1;
+                    LOGGER.info("Retrying after {}ms, attempt {}/{}", 
delayMillis, attempt, maxRetries);
+                }).transientErrors(true);
+    }
+
+    private static long computeDelayMillis(long initialDelay, long maxDelay, 
long retriesSoFar) {
+        if (initialDelay <= 0 || maxDelay <= 0) {
+            return 0L;
+        }
+
+        long delay = initialDelay;
+        for (long i = 0; i < retriesSoFar; i++) {
+            delay = delay > maxDelay / 2 ? maxDelay : delay * 2;
+        }
+
+        long jitteredDelay = ThreadLocalRandom.current().nextLong(1, delay + 
1);
+
+        return Math.min(jitteredDelay, maxDelay);
+    }
+
+    private static boolean isRetryable(Throwable error) {
+        if (error instanceof IllegalArgumentException) {
+            return false;
+        }
+        if (ExceptionUtils.causedByInterrupt(error)) {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+        return true;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
index 8967e3e28c..385525ca08 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExponentialRetryPolicy.java
@@ -101,6 +101,18 @@ public class ExponentialRetryPolicy implements 
IRetryPolicy {
         return false;
     }
 
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public long getInitialDelay() {
+        return delay;
+    }
+
+    public long getMaxDelay() {
+        return maxDelay;
+    }
+
     private static boolean isUnstable() {
         return Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
     }

Reply via email to