This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5c98027227cc57a1a1d401fdb7282532c414a28c
Author: Penghui Li <[email protected]>
AuthorDate: Fri Jan 16 01:55:31 2026 -0800

    [fix][ml] Retry offload reads when OffloadReadHandleClosedException is 
encountered (#25148)
    
    (cherry picked from commit 16bcec35cb99b8f9666e1517b1af1be1f5d40c57)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  5 ++
 .../mledger/impl/cache/RangeEntryCacheImpl.java    | 35 ++++++++++----
 .../impl/cache/RangeEntryCacheImplTest.java        | 56 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 8 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a8b41821544..06a3a5a4e56 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2183,6 +2183,11 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         return Optional.ofNullable(ledgers.get(ledgerId));
     }
 
+    public CompletableFuture<ReadHandle> reopenReadHandle(long ledgerId) {
+        invalidateReadHandle(ledgerId);
+        return getLedgerHandle(ledgerId);
+    }
+
     CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
         CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
         if (ledgerHandle != null) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index f8ee2c431b1..4dc60b6434f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -32,6 +32,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
 import java.util.function.IntSupplier;
 import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -519,6 +520,11 @@ public class RangeEntryCacheImpl implements EntryCache {
      */
     CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long 
firstEntry, long lastEntry,
                                                    IntSupplier 
expectedReadCount) {
+        return readFromStorage(lh, firstEntry, lastEntry, expectedReadCount, 
true);
+    }
+
+    private CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long 
firstEntry, long lastEntry,
+                                                          IntSupplier 
expectedReadCount, boolean allowRetry) {
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
         CompletableFuture<List<Entry>> readResult = 
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
                 .thenApply(
@@ -550,17 +556,30 @@ public class RangeEntryCacheImpl implements EntryCache {
                                 ledgerEntries.close();
                             }
                         });
-        // handle LH invalidation
-        readResult.exceptionally(exception -> {
-            if (exception instanceof BKException
-                    && ((BKException) exception).getCode() == 
BKException.Code.TooManyRequestsException) {
-            } else {
+
+        return readResult.handle((entries, exception) -> {
+            if (exception == null) {
+                return CompletableFuture.completedFuture(entries);
+            }
+
+            Throwable cause = FutureUtil.unwrapCompletionException(exception);
+            if (allowRetry && cause instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
+                log.info("[{}] Read handle closed for ledger {}, reopening", 
ml.getName(), lh.getId());
+                pendingReadsManager.invalidateLedger(lh.getId());
+                return ml.reopenReadHandle(lh.getId())
+                        .thenCompose(reopened -> readFromStorage(reopened, 
firstEntry, lastEntry, expectedReadCount,
+                                false));
+            }
+
+            if (!(cause instanceof BKException
+                    && ((BKException) cause).getCode() == 
BKException.Code.TooManyRequestsException)) {
                 ml.invalidateLedgerHandle(lh);
                 pendingReadsManager.invalidateLedger(lh.getId());
             }
-            return null;
-        });
-        return readResult;
+
+            CompletableFuture<List<Entry>> failedFuture = 
CompletableFuture.failedFuture(cause);
+            return failedFuture;
+        }).thenCompose(Function.identity());
     }
 
     @Override
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
index 2922a3d2679..42ae262e8dd 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
@@ -32,8 +32,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.IntSupplier;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -183,6 +187,58 @@ public class RangeEntryCacheImplTest {
         verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), 
eq(99L), any(), any(), any());
     }
 
+    @Test
+    public void testReadFromStorageRetriesWhenHandleClosed() {
+        RangeEntryCacheManagerImpl mockEntryCacheManager = 
mock(RangeEntryCacheManagerImpl.class);
+        ManagedLedgerFactoryMBeanImpl mlFactoryMBean = 
mock(ManagedLedgerFactoryMBeanImpl.class);
+        
when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean);
+        ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class);
+        ManagedLedgerMBeanImpl mockManagedLedgerMBean = 
mock(ManagedLedgerMBeanImpl.class);
+        when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean);
+        when(mockManagedLedger.getName()).thenReturn("testManagedLedger");
+        
when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class));
+        
when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty());
+        RangeCacheRemovalQueue mockRangeCacheRemovalQueue = 
mock(RangeCacheRemovalQueue.class);
+        when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true);
+        InflightReadsLimiter inflightReadsLimiter = 
mock(InflightReadsLimiter.class);
+        
when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter);
+        doAnswer(invocation -> {
+            long permits = invocation.getArgument(0);
+            InflightReadsLimiter.Handle handle = new 
InflightReadsLimiter.Handle(permits, System.currentTimeMillis(),
+                    true);
+            return Optional.of(handle);
+        }).when(inflightReadsLimiter).acquire(anyLong(), any());
+
+        RangeEntryCacheImpl cache = new 
RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false,
+                mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT, 
mock(PendingReadsManager.class));
+
+        ReadHandle readHandle = mock(ReadHandle.class);
+        when(readHandle.getId()).thenReturn(1L);
+        
when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle));
+
+        LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1, 
Unpooled.wrappedBuffer(new byte[] {1}));
+        LedgerEntries ledgerEntries = mock(LedgerEntries.class);
+        List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry);
+        when(ledgerEntries.iterator()).thenReturn(entryList.iterator());
+
+        AtomicInteger readAttempts = new AtomicInteger();
+        when(readHandle.readAsync(0L, 0L)).thenAnswer(invocation -> {
+            if (readAttempts.getAndIncrement() == 0) {
+                return CompletableFuture.failedFuture(new 
ManagedLedgerException.OffloadReadHandleClosedException());
+            }
+            return CompletableFuture.completedFuture(ledgerEntries);
+        });
+
+        CompletableFuture<List<Entry>> future = 
cache.readFromStorage(readHandle, 0L, 0L, () -> 1);
+        assertThat(future).isCompleted().satisfies(f -> {
+            List<Entry> entries = f.getNow(null);
+            assertThat(entries).hasSize(1);
+            assertThat(entries.get(0).getLedgerId()).isEqualTo(1L);
+            assertThat(entries.get(0).getEntryId()).isEqualTo(0L);
+        });
+        assertThat(readAttempts.get()).isEqualTo(2);
+    }
+
     private void performReadAndValidateResult() {
         CompletableFuture<List<Entry>> future = new CompletableFuture<>();
         rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount, new 
AsyncCallbacks.ReadEntriesCallback() {

Reply via email to