This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e36dfbfdeba336172e5e74b14329231d823a36ae Author: Yong Zhang <[email protected]> AuthorDate: Thu Aug 11 21:01:55 2022 +0800 [fix][tiered-storage] move the state check forward (#17020) * [fix][tiered-storage] move the state check forward --- *Motivation* Move the close check forward to avoid `getLastAddConfirmed()` get an NPE. If the state is closed. That means the resource is closed and the `OffloadIndexBlock` has been recycled. Which will cause an NPE when `getLastAddCOnfirmed()`. --- .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 12 ++++++----- .../impl/BlobStoreManagedLedgerOffloaderTest.java | 25 +++++++++++++++++++++- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 38ba38a8e65..ab64388cd4d 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -120,6 +120,11 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); boolean seeked = false; try { + if (state == State.Closed) { + log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", + ledgerId, firstEntry, lastEntry); + throw new BKException.BKUnexpectedConditionException(); + } if (firstEntry > lastEntry || firstEntry < 0 || lastEntry > getLastAddConfirmed()) { @@ -138,11 +143,6 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { } while (entriesToRead > 0) { - if (state == State.Closed) { - log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", - ledgerId, firstEntry, lastEntry); - throw new BKException.BKUnexpectedConditionException(); - } long currentPosition = inputStream.getCurrentPosition(); int length = dataStream.readInt(); if (length < 0) { // hit padding or new block @@ -189,6 +189,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { promise.complete(LedgerEntriesImpl.create(entries)); } catch (Throwable t) { + log.error("Failed to read entries {} - {} from the offloader in ledger {}", + firstEntry, lastEntry, ledgerId, t); promise.completeExceptionally(t); entries.forEach(LedgerEntry::close); } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index 74d03b11dba..e6b0cc156ad 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -566,4 +566,27 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO OffloadedLedgerMetadata offloadedLedgerMetadata2 = result.get(1); assertEquals(toWrite.getId(), offloadedLedgerMetadata2.getLedgerId()); } -} \ No newline at end of file + + @Test + public void testReadWithAClosedLedgerHandler() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); + LedgerOffloader offloader = getOffloader(); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); + Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + long lac = toTest.getLastAddConfirmed(); + toTest.readAsync(0, lac).get(); + toTest.closeAsync().get(); + try { + toTest.readAsync(0, lac).get(); + } catch (Exception e) { + if (e.getCause() instanceof BKException.BKUnexpectedConditionException) { + // expected exception + return; + } + throw e; + } + } +}
