This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ccaa970361e448405f295c7e73895696bfdceed4
Author: Penghui Li <[email protected]>
AuthorDate: Fri Jan 16 01:55:31 2026 -0800

    [fix][ml] Retry offload reads when OffloadReadHandleClosedException is 
encountered (#25148)
    
    (cherry picked from commit 16bcec35cb99b8f9666e1517b1af1be1f5d40c57)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   5 ++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    |  36 ++++++--
 .../impl/cache/RangeEntryCacheImplTest.java        | 100 +++++++++++++++++++++
 3 files changed, 133 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ea8fc4d3a64..5b692c3f3be 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2087,6 +2087,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return Optional.ofNullable(ledgers.get(ledgerId));
     }
 
+    public CompletableFuture<ReadHandle> reopenReadHandle(long ledgerId) {
+        invalidateReadHandle(ledgerId);
+        return getLedgerHandle(ledgerId);
+    }
+
     CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
         CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
         if (ledgerHandle != null) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index ad5630c078a..93277088669 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -32,6 +32,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
@@ -47,6 +48,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
 import org.apache.bookkeeper.mledger.util.RangeCache;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -442,6 +444,11 @@ public class RangeEntryCacheImpl implements EntryCache {
      */
     CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
                                                        long firstEntry, long 
lastEntry, boolean shouldCacheEntry) {
+        return readFromStorage(lh, firstEntry, lastEntry, shouldCacheEntry, 
true);
+    }
+
+    private CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, 
long firstEntry, long lastEntry,
+                                                          boolean 
shouldCacheEntry, boolean allowRetry) {
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
         CompletableFuture<List<EntryImpl>> readResult = 
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
                 .thenApply(
@@ -473,17 +480,30 @@ public class RangeEntryCacheImpl implements EntryCache {
                                 ledgerEntries.close();
                             }
                         });
-        // handle LH invalidation
-        readResult.exceptionally(exception -> {
-            if (exception instanceof BKException
-                    && ((BKException) exception).getCode() == 
BKException.Code.TooManyRequestsException) {
-            } else {
+
+        return readResult.handle((entries, exception) -> {
+            if (exception == null) {
+                return CompletableFuture.completedFuture(entries);
+            }
+
+            Throwable cause = FutureUtil.unwrapCompletionException(exception);
+            if (allowRetry && cause instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
+                log.info("[{}] Read handle closed for ledger {}, reopening", 
ml.getName(), lh.getId());
+                pendingReadsManager.invalidateLedger(lh.getId());
+                return ml.reopenReadHandle(lh.getId())
+                        .thenCompose(reopened -> readFromStorage(reopened, 
firstEntry, lastEntry,
+                                shouldCacheEntry, false));
+            }
+
+            if (!(cause instanceof BKException
+                    && ((BKException) cause).getCode() == 
BKException.Code.TooManyRequestsException)) {
                 ml.invalidateLedgerHandle(lh);
                 pendingReadsManager.invalidateLedger(lh.getId());
             }
-            return null;
-        });
-        return readResult;
+
+            CompletableFuture<List<EntryImpl>> failedFuture = 
CompletableFuture.failedFuture(cause);
+            return failedFuture;
+        }).thenCompose(Function.identity());
     }
 
     @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
new file mode 100644
index 00000000000..7590346e209
--- /dev/null
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.Unpooled;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.testng.annotations.Test;
+
+public class RangeEntryCacheImplTest {
+    @Test
+    public void testReadFromStorageRetriesWhenHandleClosed() {
+        ManagedLedgerFactoryImpl mockFactory = 
mock(ManagedLedgerFactoryImpl.class);
+        ManagedLedgerFactoryMBeanImpl mockFactoryMBean = 
mock(ManagedLedgerFactoryMBeanImpl.class);
+        when(mockFactory.getMbean()).thenReturn(mockFactoryMBean);
+        when(mockFactory.getConfig()).thenReturn(new 
ManagedLedgerFactoryConfig());
+        RangeEntryCacheManagerImpl mockEntryCacheManager = spy(new 
RangeEntryCacheManagerImpl(mockFactory, mock(
+                OrderedScheduler.class), OpenTelemetry.noop()));
+        ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class);
+        ManagedLedgerMBeanImpl mockManagedLedgerMBean = 
mock(ManagedLedgerMBeanImpl.class);
+        when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean);
+        when(mockManagedLedger.getName()).thenReturn("testManagedLedger");
+        
when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class));
+        
when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty());
+        InflightReadsLimiter inflightReadsLimiter = 
mock(InflightReadsLimiter.class);
+        
when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter);
+        doAnswer(invocation -> {
+            long permits = invocation.getArgument(0);
+            InflightReadsLimiter.Handle handle = new 
InflightReadsLimiter.Handle(permits, System.currentTimeMillis(),
+                    true);
+            return Optional.of(handle);
+        }).when(inflightReadsLimiter).acquire(anyLong(), any());
+
+        RangeEntryCacheImpl cache = new 
RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false);
+
+        ReadHandle readHandle = mock(ReadHandle.class);
+        when(readHandle.getId()).thenReturn(1L);
+        
when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle));
+
+        LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1, 
Unpooled.wrappedBuffer(new byte[]{1}));
+        LedgerEntries ledgerEntries = mock(LedgerEntries.class);
+        List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry);
+        when(ledgerEntries.iterator()).thenReturn(entryList.iterator());
+
+        AtomicInteger readAttempts = new AtomicInteger();
+        when(readHandle.readAsync(0L, 0L)).thenAnswer(invocation -> {
+            if (readAttempts.getAndIncrement() == 0) {
+                return CompletableFuture.failedFuture(new 
ManagedLedgerException.OffloadReadHandleClosedException());
+            }
+            return CompletableFuture.completedFuture(ledgerEntries);
+        });
+
+        CompletableFuture<List<EntryImpl>> future = 
cache.readFromStorage(readHandle, 0L, 0L, true);
+        assertThat(future).isCompleted().satisfies(f -> {
+            List<EntryImpl> entries = f.getNow(null);
+            assertThat(entries).hasSize(1);
+            assertThat(entries.get(0).getLedgerId()).isEqualTo(1L);
+            assertThat(entries.get(0).getEntryId()).isEqualTo(0L);
+        });
+        assertThat(readAttempts.get()).isEqualTo(2);
+    }
+}
\ No newline at end of file

Reply via email to