This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 06be308844948596907402eecba6598df6eceb18 Author: Yong Zhang <[email protected]> AuthorDate: Thu Aug 11 21:01:55 2022 +0800 [fix][tiered-storage] move the state check forward (#17020) * [fix][tiered-storage] move the state check forward --- *Motivation* Move the close check forward to avoid `getLastAddConfirmed()` get an NPE. If the state is closed. That means the resource is closed and the `OffloadIndexBlock` has been recycled. Which will cause an NPE when `getLastAddCOnfirmed()`. (cherry picked from commit ee0ea3a6f9ffb42d4ec129eb689d3c1059e5f4a8) --- .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 11 +++++++---- .../impl/BlobStoreManagedLedgerOffloaderTest.java | 23 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 73a4dd76e53..84d28692377 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -107,6 +107,11 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); boolean seeked = false; try { + if (state == State.Closed) { + log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", + ledgerId, firstEntry, lastEntry); + throw new BKException.BKUnexpectedConditionException(); + } if (firstEntry > lastEntry || firstEntry < 0 || lastEntry > getLastAddConfirmed()) { @@ -125,10 +130,6 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { } while (entriesToRead > 0) { - if (state == State.Closed) { - log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry); - throw new BKException.BKUnexpectedConditionException(); - } int length = dataStream.readInt(); if (length < 0) { // hit padding or new block inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); @@ -173,6 +174,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { promise.complete(LedgerEntriesImpl.create(entries)); } catch (Throwable t) { + log.error("Failed to read entries {} - {} from the offloader in ledger {}", + firstEntry, lastEntry, ledgerId, t); promise.completeExceptionally(t); entries.forEach(LedgerEntry::close); } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index 77dfc55b777..ab979f8a5a1 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -496,4 +496,27 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO fail("Get unexpected exception when reading entries", e); } } + + @Test + public void testReadWithAClosedLedgerHandler() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); + LedgerOffloader offloader = getOffloader(); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); + Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + long lac = toTest.getLastAddConfirmed(); + toTest.readAsync(0, lac).get(); + toTest.closeAsync().get(); + try { + toTest.readAsync(0, lac).get(); + } catch (Exception e) { + if (e.getCause() instanceof BKException.BKUnexpectedConditionException) { + // expected exception + return; + } + throw e; + } + } }
