This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f41f62d66fdff8b6790ccf00df50d33b9f50face
Author: Yan Zhao <[email protected]>
AuthorDate: Sun Aug 14 07:26:30 2022 +0800

    [fix][ML] Fix offload read handle NPE (#17056)
    
    (cherry picked from commit 6fc48d1186d0812d0e88d9e40515c147e8235555)
---
 .../bookkeeper/mledger/ManagedLedgerException.java |  7 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 25 +++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 26 ++++++++++
 .../mledger/impl/NonDurableCursorTest.java         | 59 ++++++++++++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     |  3 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  3 +-
 .../streamingdispatch/StreamingEntryReader.java    |  3 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 15 +++---
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 30 ++++++++---
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  3 +-
 10 files changed, 152 insertions(+), 22 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 7046ba48193..347a380d7eb 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -189,6 +189,13 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class OffloadReadHandleClosedException extends 
ManagedLedgerException {
+
+        public OffloadReadHandleClosedException() {
+            super("Offload read handle already closed");
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 28232f31b17..9b771a3301d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2403,7 +2403,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
                     break;
                 }
                 // if truncate, all ledgers besides currentLedger are going to 
be deleted
-                if (isTruncate){
+                if (isTruncate) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Ledger {} will be truncated with ts 
{}",
                                 name, ls.getLedgerId(), ls.getTimestamp());
@@ -2431,11 +2431,14 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
                     }
                     ledgersToDelete.add(ls);
                 } else {
-                    // once retention constraint has been met, skip check
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Ledger {} not deleted. Neither expired 
nor over-quota", name, ls.getLedgerId());
+                    if (ls.getLedgerId() < 
getTheSlowestNonDurationReadPosition().getLedgerId()) {
+                        // once retention constraint has been met, skip check
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Ledger {} not deleted. Neither 
expired nor over-quota", name,
+                                    ls.getLedgerId());
+                        }
+                        invalidateReadHandle(ls.getLedgerId());
                     }
-                    invalidateReadHandle(ls.getLedgerId());
                 }
             }
 
@@ -4049,4 +4052,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         }
     }
 
+    public Position getTheSlowestNonDurationReadPosition() {
+        PositionImpl theSlowestNonDurableReadPosition = PositionImpl.LATEST;
+        for (ManagedCursor cursor : cursors) {
+            if (cursor instanceof NonDurableCursorImpl) {
+                PositionImpl readPosition = (PositionImpl) 
cursor.getReadPosition();
+                if (readPosition.compareTo(theSlowestNonDurableReadPosition) < 
0) {
+                    theSlowestNonDurableReadPosition = readPosition;
+                }
+            }
+        }
+        return theSlowestNonDurableReadPosition;
+    }
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index ae835dccb3f..527137eab29 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -3520,6 +3521,31 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         });
     }
 
+    @Test
+    public void testGetTheSlowestNonDurationReadPosition() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test_",
+                new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, 
TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ledger.openCursor("c1");
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), 
PositionImpl.LATEST);
+
+        ManagedCursor nonDurableCursor = 
ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), 
positions.get(0));
+
+        ledger.deleteCursor(nonDurableCursor.getName());
+
+        Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), 
PositionImpl.LATEST);
+
+        ledger.close();
+    }
+
     @Test
     public void testGetLedgerMetadata() throws Exception {
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
factory.open("testGetLedgerMetadata");
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 5d0271de3d6..e0f8f459bff 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class NonDurableCursorTest extends MockedBookKeeperTestCase {
@@ -735,6 +736,64 @@ public class NonDurableCursorTest extends 
MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws 
Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new 
ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, 
TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = 
ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new 
AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, 
Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        
Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId()));
+        
Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId()));
+        
Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId()));
+        
Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId()));
+        
Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId()));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        
Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId()));
+        
Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId()));
+        
Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId()));
+        
Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId()));
+        
Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId()));
+
+        ledger.close();
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     void testCursorWithNameIsNotNull() throws Exception {
         final String p1CursorName = "entry-1";
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 3f33995c6a2..e6cc83ee4be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -619,7 +619,8 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                 // Notify the consumer only if all the messages were already 
acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, Read 
Type {} - Retrying to read in {} seconds",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index d59b84b570e..727c4f09af9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -467,7 +467,8 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                 // Notify the consumer only if all the messages were already 
acknowledged
                 consumers.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, - 
Retrying to read in {} seconds", name,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 85083da93c0..835269ca9d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -197,7 +197,8 @@ public class StreamingEntryReader implements 
AsyncCallbacks.ReadEntryCallback, W
         PositionImpl readPosition = pendingReadEntryRequest.position;
         pendingReadEntryRequest.retry++;
         long waitTimeMillis = readFailureBackoff.next();
-        if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException) {
+        if (exception.getCause() instanceof 
TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, - 
Retrying to read in {} seconds",
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 84d28692377..6675e084e97 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -103,15 +104,17 @@ public class BlobStoreBackedReadHandleImpl implements 
ReadHandle {
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, 
lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        executor.submit(() -> {
+        executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read 
range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new 
ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
+
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             boolean seeked = false;
             try {
-                if (state == State.Closed) {
-                    log.warn("Reading a closed read handler. Ledger ID: {}, 
Read range: {}-{}",
-                        ledgerId, firstEntry, lastEntry);
-                    throw new BKException.BKUnexpectedConditionException();
-                }
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
diff --git 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index 0dba4b0fc6a..ac6f121e818 100644
--- 
a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ 
b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
@@ -56,6 +57,12 @@ public class BlobStoreBackedReadHandleImplV2 implements 
ReadHandle {
     private final List<BackedInputStream> inputStreams;
     private final List<DataInputStream> dataStreams;
     private final ExecutorService executor;
+    private State state = null;
+
+    enum State {
+        Opened,
+        Closed
+    }
 
     static class GroupedReader {
         @Override
@@ -97,6 +104,7 @@ public class BlobStoreBackedReadHandleImplV2 implements 
ReadHandle {
             dataStreams.add(new DataInputStream(inputStream));
         }
         this.executor = executor;
+        this.state = State.Opened;
     }
 
     @Override
@@ -121,6 +129,7 @@ public class BlobStoreBackedReadHandleImplV2 implements 
ReadHandle {
                 for (DataInputStream dataStream : dataStreams) {
                     dataStream.close();
                 }
+                state = State.Closed;
                 promise.complete(null);
             } catch (IOException t) {
                 promise.completeExceptionally(t);
@@ -133,13 +142,20 @@ public class BlobStoreBackedReadHandleImplV2 implements 
ReadHandle {
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long 
lastEntry) {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, 
lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        if (firstEntry > lastEntry
-                || firstEntry < 0
-                || lastEntry > getLastAddConfirmed()) {
-            promise.completeExceptionally(new IllegalArgumentException());
-            return promise;
-        }
-        executor.submit(() -> {
+        executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read 
range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new 
ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
+
+            if (firstEntry > lastEntry
+                    || firstEntry < 0
+                    || lastEntry > getLastAddConfirmed()) {
+                promise.completeExceptionally(new 
BKException.BKIncorrectParameterException());
+                return;
+            }
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             List<GroupedReader> groupedReaders = null;
             try {
diff --git 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index ab979f8a5a1..b78666ad645 100644
--- 
a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ 
b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
 import 
org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
 import org.jclouds.blobstore.BlobStore;
@@ -512,7 +513,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends 
BlobStoreManagedLedgerO
         try {
             toTest.readAsync(0, lac).get();
         } catch (Exception e) {
-            if (e.getCause() instanceof 
BKException.BKUnexpectedConditionException) {
+            if (e.getCause() instanceof 
ManagedLedgerException.OffloadReadHandleClosedException) {
                 // expected exception
                 return;
             }

Reply via email to