This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ccaa970361e448405f295c7e73895696bfdceed4 Author: Penghui Li <[email protected]> AuthorDate: Fri Jan 16 01:55:31 2026 -0800 [fix][ml] Retry offload reads when OffloadReadHandleClosedException is encountered (#25148) (cherry picked from commit 16bcec35cb99b8f9666e1517b1af1be1f5d40c57) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 ++ .../mledger/impl/cache/RangeEntryCacheImpl.java | 36 ++++++-- .../impl/cache/RangeEntryCacheImplTest.java | 100 +++++++++++++++++++++ 3 files changed, 133 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ea8fc4d3a64..5b692c3f3be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2087,6 +2087,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return Optional.ofNullable(ledgers.get(ledgerId)); } + public CompletableFuture<ReadHandle> reopenReadHandle(long ledgerId) { + invalidateReadHandle(ledgerId); + return getLedgerHandle(ledgerId); + } + CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) { CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId); if (ledgerHandle != null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index ad5630c078a..93277088669 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Function; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; @@ -47,6 +48,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.RangeCache; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -442,6 +444,11 @@ public class RangeEntryCacheImpl implements EntryCache { */ CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { + return readFromStorage(lh, firstEntry, lastEntry, shouldCacheEntry, true); + } + + private CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, + boolean shouldCacheEntry, boolean allowRetry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) .thenApply( @@ -473,17 +480,30 @@ public class RangeEntryCacheImpl implements EntryCache { ledgerEntries.close(); } }); - // handle LH invalidation - readResult.exceptionally(exception -> { - if (exception instanceof BKException - && ((BKException) exception).getCode() == BKException.Code.TooManyRequestsException) { - } else { + + return readResult.handle((entries, exception) -> { + if (exception == null) { + return CompletableFuture.completedFuture(entries); + } + + Throwable cause = FutureUtil.unwrapCompletionException(exception); + if (allowRetry && cause instanceof ManagedLedgerException.OffloadReadHandleClosedException) { + log.info("[{}] Read handle closed for ledger {}, reopening", ml.getName(), lh.getId()); + pendingReadsManager.invalidateLedger(lh.getId()); + return ml.reopenReadHandle(lh.getId()) + .thenCompose(reopened -> readFromStorage(reopened, firstEntry, lastEntry, + shouldCacheEntry, false)); + } + + if (!(cause instanceof BKException + && ((BKException) cause).getCode() == BKException.Code.TooManyRequestsException)) { ml.invalidateLedgerHandle(lh); pendingReadsManager.invalidateLedger(lh.getId()); } - return null; - }); - return readResult; + + CompletableFuture<List<EntryImpl>> failedFuture = CompletableFuture.failedFuture(cause); + return failedFuture; + }).thenCompose(Function.identity()); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java new file mode 100644 index 00000000000..7590346e209 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImplTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import io.netty.buffer.Unpooled; +import io.opentelemetry.api.OpenTelemetry; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; +import org.testng.annotations.Test; + +public class RangeEntryCacheImplTest { + @Test + public void testReadFromStorageRetriesWhenHandleClosed() { + ManagedLedgerFactoryImpl mockFactory = mock(ManagedLedgerFactoryImpl.class); + ManagedLedgerFactoryMBeanImpl mockFactoryMBean = mock(ManagedLedgerFactoryMBeanImpl.class); + when(mockFactory.getMbean()).thenReturn(mockFactoryMBean); + when(mockFactory.getConfig()).thenReturn(new ManagedLedgerFactoryConfig()); + RangeEntryCacheManagerImpl mockEntryCacheManager = spy(new RangeEntryCacheManagerImpl(mockFactory, mock( + OrderedScheduler.class), OpenTelemetry.noop())); + ManagedLedgerImpl mockManagedLedger = mock(ManagedLedgerImpl.class); + ManagedLedgerMBeanImpl mockManagedLedgerMBean = mock(ManagedLedgerMBeanImpl.class); + when(mockManagedLedger.getMbean()).thenReturn(mockManagedLedgerMBean); + when(mockManagedLedger.getName()).thenReturn("testManagedLedger"); + when(mockManagedLedger.getExecutor()).thenReturn(mock(java.util.concurrent.ExecutorService.class)); + when(mockManagedLedger.getOptionalLedgerInfo(1L)).thenReturn(Optional.empty()); + InflightReadsLimiter inflightReadsLimiter = mock(InflightReadsLimiter.class); + when(mockEntryCacheManager.getInflightReadsLimiter()).thenReturn(inflightReadsLimiter); + doAnswer(invocation -> { + long permits = invocation.getArgument(0); + InflightReadsLimiter.Handle handle = new InflightReadsLimiter.Handle(permits, System.currentTimeMillis(), + true); + return Optional.of(handle); + }).when(inflightReadsLimiter).acquire(anyLong(), any()); + + RangeEntryCacheImpl cache = new RangeEntryCacheImpl(mockEntryCacheManager, mockManagedLedger, false); + + ReadHandle readHandle = mock(ReadHandle.class); + when(readHandle.getId()).thenReturn(1L); + when(mockManagedLedger.reopenReadHandle(1L)).thenReturn(CompletableFuture.completedFuture(readHandle)); + + LedgerEntryImpl ledgerEntry = LedgerEntryImpl.create(1L, 0L, 1, Unpooled.wrappedBuffer(new byte[]{1})); + LedgerEntries ledgerEntries = mock(LedgerEntries.class); + List<LedgerEntry> entryList = List.of((LedgerEntry) ledgerEntry); + when(ledgerEntries.iterator()).thenReturn(entryList.iterator()); + + AtomicInteger readAttempts = new AtomicInteger(); + when(readHandle.readAsync(0L, 0L)).thenAnswer(invocation -> { + if (readAttempts.getAndIncrement() == 0) { + return CompletableFuture.failedFuture(new ManagedLedgerException.OffloadReadHandleClosedException()); + } + return CompletableFuture.completedFuture(ledgerEntries); + }); + + CompletableFuture<List<EntryImpl>> future = cache.readFromStorage(readHandle, 0L, 0L, true); + assertThat(future).isCompleted().satisfies(f -> { + List<EntryImpl> entries = f.getNow(null); + assertThat(entries).hasSize(1); + assertThat(entries.get(0).getLedgerId()).isEqualTo(1L); + assertThat(entries.get(0).getEntryId()).isEqualTo(0L); + }); + assertThat(readAttempts.get()).isEqualTo(2); + } +} \ No newline at end of file
