This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 16bcec35cb9 [fix][ml] Retry offload reads when
OffloadReadHandleClosedException is encountered (#25148)
16bcec35cb9 is described below
commit 16bcec35cb99b8f9666e1517b1af1be1f5d40c57
Author: Penghui Li <[email protected]>
AuthorDate: Fri Jan 16 01:55:31 2026 -0800
[fix][ml] Retry offload reads when OffloadReadHandleClosedException is
encountered (#25148)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++
.../mledger/impl/cache/RangeEntryCacheImpl.java | 35 ++++++++++----
.../impl/cache/RangeEntryCacheImplTest.java | 56 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 8 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index bb682cdb293..6dd886fe4ea 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2193,6 +2193,11 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
return Optional.ofNullable(ledgers.get(ledgerId));
}
+ public CompletableFuture<ReadHandle> reopenReadHandle(long ledgerId) {
+ invalidateReadHandle(ledgerId);
+ return getLedgerHandle(ledgerId);
+ }
+
CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
index f8ee2c431b1..4dc60b6434f 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
@@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
import java.util.function.IntSupplier;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -519,6 +520,11 @@ public class RangeEntryCacheImpl implements EntryCache {
*/
CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long
firstEntry, long lastEntry,
IntSupplier
expectedReadCount) {
+ return readFromStorage(lh, firstEntry, lastEntry, expectedReadCount,
true);
+ }
+
+ private CompletableFuture<List<Entry>> readFromStorage(ReadHandle lh, long
firstEntry, long lastEntry,
+ IntSupplier
expectedReadCount, boolean allowRetry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<Entry>> readResult =
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
.thenApply(
@@ -550,17 +556,30 @@ public class RangeEntryCacheImpl implements EntryCache {
ledgerEntries.close();
}
});
- // handle LH invalidation
- readResult.exceptionally(exception -> {
- if (exception instanceof BKException
- && ((BKException) exception).getCode() ==
BKException.Code.TooManyRequestsException) {
- } else {
+
+ return readResult.handle((entries, exception) -> {
+ if (exception == null) {
+ return CompletableFuture.completedFuture(entries);
+ }
+
+ Throwable cause = FutureUtil.unwrapCompletionException(exception);
+ if (allowRetry && cause instanceof
ManagedLedgerException.OffloadReadHandleClosedException) {
+ log.info("[{}] Read handle closed for ledger {}, reopening",
ml.getName(), lh.getId());
+ pendingReadsManager.invalidateLedger(lh.getId());
+ return ml.reopenReadHandle(lh.getId())
+ .thenCompose(reopened -> readFromStorage(reopened,
firstEntry, lastEntry, expectedReadCount,
+ false));
+ }
+
+ if (!(cause instanceof BKException
+ && ((BKException) cause).getCode() ==
BKException.Code.TooManyRequestsException)) {
ml.invalidateLedgerHandle(lh);
pendingReadsManager.invalidateLedger(lh.getId());
}
- return null;
- });
- return readResult;
+
+ CompletableFuture<List<Entry>> failedFuture =
CompletableFuture.failedFuture(cause);
+ return failedFuture;
+ }).thenCompose(Function.identity());
}
@Override
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
index 2922a3d2679..42ae262e8dd 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java
@@ -32,8 +32,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -183,6 +187,58 @@ public class RangeEntryCacheImplTest {
verify(pendingReadsManager, times(1)).readEntries(any(), eq(0L),
eq(99L), any(), any(), any());
}
+ @Test
+ public void testReadFromStorageRetriesWhenHandleClosed() {
+ RangeEntryCacheManagerImpl mockEntryCacheManager =
mock(RangeEntryCacheManagerImpl.class);
+ ManagedLedgerFactoryMBeanImpl mlFactoryMBean =
mock(ManagedLedgerFactoryMBeanImpl.class);
+
when(mockEntryCacheManager.getMlFactoryMBean()).thenReturn(mlFactoryMBean);
+ ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class);
+ ManagedLedgerMBeanImpl mockManagedLedgerMBean =
mock(ManagedLedgerMBeanImpl.class);
+ when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean);
+ when(mockManagedLedger.getName()).thenReturn("testManagedLedger");
+
when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class));
+
when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty());
+ RangeCacheRemovalQueue mockRangeCacheRemovalQueue =
mock(RangeCacheRemovalQueue.class);
+ when(mockRangeCacheRemovalQueue.addEntry(any())).thenReturn(true);
+ InflightReadsLimiter inflightReadsLimiter =
mock(InflightReadsLimiter.class);
+
when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter);
+ doAnswer(invocation -> {
+ long permits = invocation.getArgument(0);
+ InflightReadsLimiter.Handle handle = new
InflightReadsLimiter.Handle(permits, System.currentTimeMillis(),
+ true);
+ return Optional.of(handle);
+ }).when(inflightReadsLimiter).acquire(anyLong(), any());
+
+ RangeEntryCacheImpl cache = new
RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false,
+ mockRangeCacheRemovalQueue, EntryLengthFunction.DEFAULT,
mock(PendingReadsManager.class));
+
+ ReadHandle readHandle = mock(ReadHandle.class);
+ when(readHandle.getId()).thenReturn(1L);
+
when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle));
+
+ LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1,
Unpooled.wrappedBuffer(new byte[] {1}));
+ LedgerEntries ledgerEntries = mock(LedgerEntries.class);
+ List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry);
+ when(ledgerEntries.iterator()).thenReturn(entryList.iterator());
+
+ AtomicInteger readAttempts = new AtomicInteger();
+ when(readHandle.readAsync(0L, 0L)).thenAnswer(invocation -> {
+ if (readAttempts.getAndIncrement() == 0) {
+ return CompletableFuture.failedFuture(new
ManagedLedgerException.OffloadReadHandleClosedException());
+ }
+ return CompletableFuture.completedFuture(ledgerEntries);
+ });
+
+ CompletableFuture<List<Entry>> future =
cache.readFromStorage(readHandle, 0L, 0L, () -> 1);
+ assertThat(future).isCompleted().satisfies(f -> {
+ List<Entry> entries = f.getNow(null);
+ assertThat(entries).hasSize(1);
+ assertThat(entries.get(0).getLedgerId()).isEqualTo(1L);
+ assertThat(entries.get(0).getEntryId()).isEqualTo(0L);
+ });
+ assertThat(readAttempts.get()).isEqualTo(2);
+ }
+
private void performReadAndValidateResult() {
CompletableFuture<List<Entry>> future = new CompletableFuture<>();
rangeEntryCache.asyncReadEntry(lh, 0, 99, expectedReadCount, new
AsyncCallbacks.ReadEntriesCallback() {