This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 57b9824a0d3afe4a10bbcada91a3903cce1f2f57 Author: Yan Zhao <[email protected]> AuthorDate: Sun Aug 14 07:26:30 2022 +0800 [fix][ML] Fix offload read handle NPE (#17056) --- .../bookkeeper/mledger/ManagedLedgerException.java | 7 +++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 ++++++++-- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 26 ++++++++++ .../mledger/impl/NonDurableCursorTest.java | 59 ++++++++++++++++++++++ .../PersistentDispatcherMultipleConsumers.java | 3 +- .../PersistentDispatcherSingleActiveConsumer.java | 3 +- .../streamingdispatch/StreamingEntryReader.java | 3 +- .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 13 +++-- .../impl/BlobStoreBackedReadHandleImplV2.java | 28 +++++++--- .../impl/BlobStoreManagedLedgerOffloaderTest.java | 3 +- 10 files changed, 151 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 7046ba48193..347a380d7eb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -189,6 +189,13 @@ public class ManagedLedgerException extends Exception { } } + public static class OffloadReadHandleClosedException extends ManagedLedgerException { + + public OffloadReadHandleClosedException() { + super("Offload read handle already closed"); + } + } + @Override public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 29c78dd7bbb..a2282ee0f92 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2509,7 +2509,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { break; } // if truncate, all ledgers besides currentLedger are going to be deleted - if (isTruncate){ + if (isTruncate) { if (log.isDebugEnabled()) { log.debug("[{}] Ledger {} will be truncated with ts {}", name, ls.getLedgerId(), ls.getTimestamp()); @@ -2537,11 +2537,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } ledgersToDelete.add(ls); } else { - // once retention constraint has been met, skip check - if (log.isDebugEnabled()) { - log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId()); + if (ls.getLedgerId() < getTheSlowestNonDurationReadPosition().getLedgerId()) { + // once retention constraint has been met, skip check + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, + ls.getLedgerId()); + } + invalidateReadHandle(ls.getLedgerId()); } - invalidateReadHandle(ls.getLedgerId()); } } @@ -4207,4 +4210,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } } + + public Position getTheSlowestNonDurationReadPosition() { + PositionImpl theSlowestNonDurableReadPosition = PositionImpl.LATEST; + for (ManagedCursor cursor : cursors) { + if (cursor instanceof NonDurableCursorImpl) { + PositionImpl readPosition = (PositionImpl) cursor.getReadPosition(); + if (readPosition.compareTo(theSlowestNonDurableReadPosition) < 0) { + theSlowestNonDurableReadPosition = readPosition; + } + } + } + return theSlowestNonDurableReadPosition; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d556ff8eb63..3293c98c5c6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -3840,6 +3841,31 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { }); } + @Test + public void testGetTheSlowestNonDurationReadPosition() throws Exception { + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test_", + new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS) + .setRetentionSizeInMB(-1)); + ledger.openCursor("c1"); + + List<Position> positions = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8))); + } + + Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), PositionImpl.LATEST); + + ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST); + + Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), positions.get(0)); + + ledger.deleteCursor(nonDurableCursor.getName()); + + Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), PositionImpl.LATEST); + + ledger.close(); + } + @Test public void testGetLedgerMetadata() throws Exception { ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("testGetLedgerMetadata"); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 29ba99687ba..f0d71f2ed18 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; public class NonDurableCursorTest extends MockedBookKeeperTestCase { @@ -737,6 +738,64 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test + public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception { + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor", + new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS) + .setRetentionSizeInMB(-1)); + ManagedCursor c1 = ledger.openCursor("c1"); + ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST); + + List<Position> positions = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8))); + } + + CountDownLatch latch = new CountDownLatch(10); + for (int i = 0; i < 10; i++) { + ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + latch.countDown(); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }, null); + } + + latch.await(); + + c1.markDelete(positions.get(4)); + + CompletableFuture<Void> promise = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise); + promise.join(); + + Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId())); + Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId())); + Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId())); + Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId())); + Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId())); + + promise = new CompletableFuture<>(); + + nonDurableCursor.markDelete(positions.get(3)); + + ledger.internalTrimConsumedLedgers(promise); + promise.join(); + + Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId())); + Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId())); + Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId())); + Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId())); + Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId())); + + ledger.close(); + } + @Test(expectedExceptions = NullPointerException.class) void testCursorWithNameIsNotNull() throws Exception { final String p1CursorName = "entry-1"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index ea3b9bbe540..7ed277ddfda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -792,7 +792,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul // Notify the consumer only if all the messages were already acknowledged consumerList.forEach(Consumer::reachedEndOfTopic); } - } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) { + } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException + || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) { waitTimeMillis = 1; if (log.isDebugEnabled()) { log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 022a86ce9a0..608b0fa503f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -497,7 +497,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher // Notify the consumer only if all the messages were already acknowledged consumers.forEach(Consumer::reachedEndOfTopic); } - } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) { + } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException + || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) { waitTimeMillis = 1; if (log.isDebugEnabled()) { log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", name, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java index ec46d32548f..3f6f82200b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java @@ -197,7 +197,8 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W PositionImpl readPosition = pendingReadEntryRequest.position; pendingReadEntryRequest.retry++; long waitTimeMillis = readFailureBackoff.next(); - if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) { + if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException + || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) { waitTimeMillis = 1; if (log.isDebugEnabled()) { log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index ab64388cd4d..499084ab9cd 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -39,6 +39,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; @@ -117,14 +118,16 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle { } CompletableFuture<LedgerEntries> promise = new CompletableFuture<>(); executor.execute(() -> { + if (state == State.Closed) { + log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", + ledgerId, firstEntry, lastEntry); + promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException()); + return; + } + List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); boolean seeked = false; try { - if (state == State.Closed) { - log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", - ledgerId, firstEntry, lastEntry); - throw new BKException.BKUnexpectedConditionException(); - } if (firstEntry > lastEntry || firstEntry < 0 || lastEntry > getLastAddConfirmed()) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index b896f38d390..495a6e2fcb3 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder; @@ -58,6 +59,12 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { private final List<BackedInputStream> inputStreams; private final List<DataInputStream> dataStreams; private final ExecutorService executor; + private State state = null; + + enum State { + Opened, + Closed + } static class GroupedReader { @Override @@ -99,6 +106,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { dataStreams.add(new DataInputStream(inputStream)); } this.executor = executor; + this.state = State.Opened; } @Override @@ -123,6 +131,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { for (DataInputStream dataStream : dataStreams) { dataStream.close(); } + state = State.Closed; promise.complete(null); } catch (IOException t) { promise.completeExceptionally(t); @@ -135,13 +144,20 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); CompletableFuture<LedgerEntries> promise = new CompletableFuture<>(); - if (firstEntry > lastEntry - || firstEntry < 0 - || lastEntry > getLastAddConfirmed()) { - promise.completeExceptionally(new IllegalArgumentException()); - return promise; - } executor.execute(() -> { + if (state == State.Closed) { + log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", + ledgerId, firstEntry, lastEntry); + promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException()); + return; + } + + if (firstEntry > lastEntry + || firstEntry < 0 + || lastEntry > getLastAddConfirmed()) { + promise.completeExceptionally(new BKException.BKIncorrectParameterException()); + return; + } List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); List<GroupedReader> groupedReaders = null; try { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index e6b0cc156ad..6f499d153e2 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -44,6 +44,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl; import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata; import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider; @@ -582,7 +583,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO try { toTest.readAsync(0, lac).get(); } catch (Exception e) { - if (e.getCause() instanceof BKException.BKUnexpectedConditionException) { + if (e.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) { // expected exception return; }
