This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5c98027227cc57a1a1d401fdb7282532c414a28c Author: Penghui Li <[email protected]> AuthorDate: Fri Jan 16 01:55:31 2026 -0800 [fix][ml] Retry offload reads when OffloadReadHandleClosedException is encountered (#25148) (cherry picked from commit 16bcec35cb99b8f9666e1517b1af1be1f5d40c57) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++ .../mledger/impl/cache/RangeEntryCacheImpl.java | 35 ++++++++++---- .../impl/cache/RangeEntryCacheImplTest.java | 56 ++++++++++++++++++++++ 3 files changed, 88 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a8b41821544..06a3a5a4e56 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2183,6 +2183,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return Optional.ofNullable(ledgers.get(ledgerId)); } + public CompletableFuture<ReadHandle> reopenReadHandle(long ledgerId) { + invalidateReadHandle(ledgerId); + return getLedgerHandle(ledgerId); + } + CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) { CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId); if (ledgerHandle != null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index f8ee2c431b1..4dc60b6434f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; import java.util.function.IntSupplier; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -519,6 +520,11 @@ public class RangeEntryCacheImpl implements EntryCache { */ CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, IntSupplier expectedReadCount) { + return readFromStorage(lh, firstEntry, lastEntry, expectedReadCount, true); + } + + private CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, + IntSupplier expectedReadCount, boolean allowRetry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; CompletableFuture<List<Entry>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) .thenApply( @@ -550,17 +556,30 @@ public class RangeEntryCacheImpl implements EntryCache { ledgerEntries.close(); } }); - // handle LH invalidation - readResult.exceptionally(exception -> { - if (exception instanceof BKException - && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) { - } else { + + return readResult.handle((entries, exception) -> { + if (exception == null) { + return CompletableFuture.completedFuture(entries); + } + + Throwable cause = FutureUtil.unwrapCompletionException(exception); + if (allowRetry && cause instanceof ManagedLedgerException.OffloadReadHandleClosedException) { + log.info("[{}] Read handle closed for ledger {}, reopening", ml.getName(), lh.getId()); + pendingReadsManager.invalidateLedger(lh.getId()); + return ml.reopenReadHandle(lh.getId()) + .thenCompose(reopened -> readFromStorage(reopened, firstEntry, lastEntry, expectedReadCount, + false)); + } + + if (!(cause instanceof BKException + && ((BKException) cause).getCode() == BKException.Code.TooManyRequestsException)) { ml.invalidateLedgerHandle(lh); pendingReadsManager.invalidateLedger(lh.getId()); } - return null; - }); - return readResult; + + CompletableFuture<List<Entry>> failedFuture = CompletableFuture.failedFuture(cause); + return failedFuture; + }).thenCompose(Function.identity()); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java index 2922a3d2679..42ae262e8dd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -32,8 +32,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -183,6 +187,58 @@ public class RangeEntryCacheImplTest { verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L), eq(99L), any(), any(), any()); } + @Test + public void testReadFromStorageRetriesWhenHandleClosed() { + RangeEntryCacheManagerImpl mockEntryCacheManager = mock(RangeEntryCacheManagerImpl.class); + ManagedLedgerFactoryMBeanImpl mlFactoryMBean = mock(ManagedLedgerFactoryMBeanImpl.class); + when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean); + ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); + ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); + when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); + when(mockManagedLedger.getName()).thenReturn("testManagedLedger"); + when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class)); + when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty()); + RangeCacheRemovalQueue mockRangeCacheRemovalQueue = mock(RangeCacheRemovalQueue.class); + when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true); + InflightReadsLimiter inflightReadsLimiter = mock(InflightReadsLimiter.class); + when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter); + doAnswer(invocation -> { + long permits = invocation.getArgument(0); + InflightReadsLimiter.Handle handle = new InflightReadsLimiter.Handle(permits, System.currentTimeMillis(), + true); + return Optional.of(handle); + }).when(inflightReadsLimiter).acquire(anyLong(), any()); + + RangeEntryCacheImpl cache = new RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false, + mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT, mock(PendingReadsManager.class)); + + ReadHandle readHandle = mock(ReadHandle.class); + when(readHandle.getId()).thenReturn(1L); + when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle)); + + LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1, Unpooled.wrappedBuffer(new byte[] {1})); + LedgerEntries ledgerEntries = mock(LedgerEntries.class); + List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry); + when(ledgerEntries.iterator()).thenReturn(entryList.iterator()); + + AtomicInteger readAttempts = new AtomicInteger(); + when(readHandle.readAsync(0L, 0L)).thenAnswer(invocation -> { + if (readAttempts.getAndIncrement() == 0) { + return CompletableFuture.failedFuture(new ManagedLedgerException.OffloadReadHandleClosedException()); + } + return CompletableFuture.completedFuture(ledgerEntries); + }); + + CompletableFuture<List<Entry>> future = cache.readFromStorage(readHandle, 0L, 0L, () -> 1); + assertThat(future).isCompleted().satisfies(f -> { + List<Entry> entries = f.getNow(null); + assertThat(entries).hasSize(1); + assertThat(entries.get(0).getLedgerId()).isEqualTo(1L); + assertThat(entries.get(0).getEntryId()).isEqualTo(0L); + }); + assertThat(readAttempts.get()).isEqualTo(2); + } + private void performReadAndValidateResult() { CompletableFuture<List<Entry>> future = new CompletableFuture<>(); rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount, new AsyncCallbacks.ReadEntriesCallback() {
