This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ec1d071c71 [feat][broker][PIP-195]Implement Filter out all delayed 
messages and skip them when reading messages from bookies - part7 (#19035)
9ec1d071c71 is described below

commit 9ec1d071c7188a2db694e9d7b359faaf33cb076e
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Jan 3 09:37:56 2023 +0800

    [feat][broker][PIP-195]Implement Filter out all delayed messages and skip 
them when reading messages from bookies - part7 (#19035)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   | 64 +++++++++++++++++++++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 30 ++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 33 +++++++++++
 .../bookkeeper/mledger/impl/OpReadEntry.java       | 28 +++++++--
 .../org/apache/bookkeeper/mledger/impl/OpScan.java |  2 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 67 +++++++++++++++++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  4 +-
 .../PersistentDispatcherMultipleConsumers.java     | 18 +++++-
 9 files changed, 230 insertions(+), 18 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 4167c040ed6..7802ed07781 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -167,6 +167,21 @@ public interface ManagedCursor {
     void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, 
ReadEntriesCallback callback,
                           Object ctx, PositionImpl maxPosition);
 
+    /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     * @param maxPosition           max position can read
+     * @param skipCondition         predicate of read filter out
+     */
+    default void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long 
maxSizeBytes, ReadEntriesCallback callback,
+                                          Object ctx, PositionImpl 
maxPosition, Predicate<PositionImpl> skipCondition) {
+        asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, callback, ctx, 
maxPosition);
+    }
+
     /**
      * Get 'N'th entry from the mark delete position in the cursor without 
updating any cursor positions.
      *
@@ -264,6 +279,55 @@ public interface ManagedCursor {
     void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, 
ReadEntriesCallback callback, Object ctx,
                                 PositionImpl maxPosition);
 
+    /**
+     * Asynchronously read entries from the ManagedLedger, up to the specified 
number and size.
+     *
+     * <p/>If no entries are available, the callback will not be triggered. 
Instead it will be registered to wait until
+     * a new message will be persisted into the managed ledger
+     *
+     * @see #readEntriesOrWait(int, long)
+     * @param maxEntries
+     *            maximum number of entries to return
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     * @param maxPosition
+     *            max position can read
+     * @param skipCondition
+     *            predicate of read filter out
+     */
+    default void asyncReadEntriesWithSkipOrWait(int maxEntries, 
ReadEntriesCallback callback, Object ctx,
+                                                PositionImpl maxPosition, 
Predicate<PositionImpl> skipCondition) {
+        asyncReadEntriesOrWait(maxEntries, callback, ctx, maxPosition);
+    }
+
+    /**
+     * Asynchronously read entries from the ManagedLedger, up to the specified 
number and size.
+     *
+     * <p/>If no entries are available, the callback will not be triggered. 
Instead it will be registered to wait until
+     * a new message will be persisted into the managed ledger
+     *
+     * @see #readEntriesOrWait(int, long)
+     * @param maxEntries
+     *            maximum number of entries to return
+     * @param maxSizeBytes
+     *            max size in bytes of the entries to return
+     * @param callback
+     *            callback object
+     * @param ctx
+     *            opaque context
+     * @param maxPosition
+     *            max position can read
+     * @param skipCondition
+     *            predicate of read filter out
+     */
+    default void asyncReadEntriesWithSkipOrWait(int maxEntries, long 
maxSizeBytes, ReadEntriesCallback callback,
+                                                Object ctx, PositionImpl 
maxPosition,
+                                                Predicate<PositionImpl> 
skipCondition) {
+        asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx, 
maxPosition);
+    }
+
     /**
      * Cancel a previously scheduled asyncReadEntriesOrWait operation.
      *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3af2e568d86..5b351c99649 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -762,13 +762,19 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     @Override
     public void asyncReadEntries(final int numberOfEntriesToRead, final 
ReadEntriesCallback callback,
-            final Object ctx, PositionImpl maxPosition) {
+                                 final Object ctx, PositionImpl maxPosition) {
         asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, 
ctx, maxPosition);
     }
 
     @Override
     public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, 
ReadEntriesCallback callback,
                                  Object ctx, PositionImpl maxPosition) {
+        asyncReadEntriesWithSkip(numberOfEntriesToRead, maxSizeBytes, 
callback, ctx, maxPosition, null);
+    }
+
+    @Override
+    public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long 
maxSizeBytes, ReadEntriesCallback callback,
+                                 Object ctx, PositionImpl maxPosition, 
Predicate<PositionImpl> skipCondition) {
         checkArgument(numberOfEntriesToRead > 0);
         if (isClosed()) {
             callback.readEntriesFailed(new ManagedLedgerException
@@ -779,7 +785,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, 
maxSizeBytes);
 
         PENDING_READ_OPS_UPDATER.incrementAndGet(this);
-        OpReadEntry op = OpReadEntry.create(this, readPosition, 
numOfEntriesToRead, callback, ctx, maxPosition);
+        OpReadEntry op =
+                OpReadEntry.create(this, readPosition, numOfEntriesToRead, 
callback, ctx, maxPosition, skipCondition);
         ledger.asyncReadEntries(op);
     }
 
@@ -901,6 +908,20 @@ public class ManagedCursorImpl implements ManagedCursor {
     @Override
     public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, 
ReadEntriesCallback callback, Object ctx,
                                        PositionImpl maxPosition) {
+        asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, 
ctx, maxPosition, null);
+    }
+
+    @Override
+    public void asyncReadEntriesWithSkipOrWait(int maxEntries, 
ReadEntriesCallback callback,
+                                               Object ctx, PositionImpl 
maxPosition,
+                                               Predicate<PositionImpl> 
skipCondition) {
+        asyncReadEntriesWithSkipOrWait(maxEntries, NO_MAX_SIZE_LIMIT, 
callback, ctx, maxPosition, skipCondition);
+    }
+
+    @Override
+    public void asyncReadEntriesWithSkipOrWait(int maxEntries, long 
maxSizeBytes, ReadEntriesCallback callback,
+                                               Object ctx, PositionImpl 
maxPosition,
+                                               Predicate<PositionImpl> 
skipCondition) {
         checkArgument(maxEntries > 0);
         if (isClosed()) {
             callback.readEntriesFailed(new 
CursorAlreadyClosedException("Cursor was already closed"), ctx);
@@ -914,10 +935,11 @@ public class ManagedCursorImpl implements ManagedCursor {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Read entries immediately", 
ledger.getName(), name);
             }
-            asyncReadEntries(numberOfEntriesToRead, callback, ctx, 
maxPosition);
+            asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, 
callback, ctx,
+                    maxPosition, skipCondition);
         } else {
             OpReadEntry op = OpReadEntry.create(this, readPosition, 
numberOfEntriesToRead, callback,
-                    ctx, maxPosition);
+                    ctx, maxPosition, skipCondition);
 
             if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                 op.recycle();
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 290e4bb530f..3a80d122272 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2054,6 +2054,39 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 
         long lastEntry = min(firstEntry + 
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
 
+        // Filer out and skip unnecessary read entry
+        if (opReadEntry.skipCondition != null) {
+            long firstValidEntry = -1L;
+            long lastValidEntry = -1L;
+            long entryId = firstEntry;
+            for (; entryId <= lastEntry; entryId++) {
+                if 
(opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    if (firstValidEntry == -1L) {
+                        firstValidEntry = entryId;
+                    }
+                } else {
+                    if (firstValidEntry != -1L) {
+                        break;
+                    }
+                }
+
+                if (firstValidEntry != -1L) {
+                    lastValidEntry = entryId;
+                }
+            }
+
+            // If all messages in [firstEntry...lastEntry] are filter out,
+            // then manual call internalReadEntriesComplete to advance read 
position.
+            if (firstValidEntry == -1L) {
+                
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), 
opReadEntry.ctx,
+                        PositionImpl.get(ledger.getId(), lastEntry));
+                return;
+            }
+
+            firstEntry = firstValidEntry;
+            lastEntry = lastValidEntry;
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entries from ledger {} - first={} 
last={}", name, ledger.getId(), firstEntry,
                     lastEntry);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 86290d8bc8f..04437b5320d 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -22,7 +22,9 @@ import static 
org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -45,8 +47,10 @@ class OpReadEntry implements ReadEntriesCallback {
     private PositionImpl nextReadPosition;
     PositionImpl maxPosition;
 
+    Predicate<PositionImpl> skipCondition;
+
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl 
readPositionRef, int count,
-            ReadEntriesCallback callback, Object ctx, PositionImpl 
maxPosition) {
+            ReadEntriesCallback callback, Object ctx, PositionImpl 
maxPosition, Predicate<PositionImpl> skipCondition) {
         OpReadEntry op = RECYCLER.get();
         op.readPosition = 
cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
@@ -57,13 +61,13 @@ class OpReadEntry implements ReadEntriesCallback {
             maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
+        op.skipCondition = skipCondition;
         op.ctx = ctx;
         op.nextReadPosition = PositionImpl.get(op.readPosition);
         return op;
     }
 
-    @Override
-    public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+    void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, 
PositionImpl lastPosition) {
         // Filter the returned entries for individual deleted messages
         int entriesCount = returnedEntries.size();
         long entriesSize = 0;
@@ -72,13 +76,19 @@ class OpReadEntry implements ReadEntriesCallback {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) 
returnedEntries.get(entriesCount - 1).getPosition();
+        if (entriesCount != 0) {
+            lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 
1).getPosition();
+        }
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Read entries succeeded batch_size={} 
cumulative_size={} requested_count={}",
                     cursor.ledger.getName(), cursor.getName(), 
returnedEntries.size(), entries.size(), count);
         }
-        List<Entry> filteredEntries = 
cursor.filterReadEntries(returnedEntries);
-        entries.addAll(filteredEntries);
+
+        List<Entry> filteredEntries = Collections.emptyList();
+        if (entriesCount != 0) {
+            filteredEntries = cursor.filterReadEntries(returnedEntries);
+            entries.addAll(filteredEntries);
+        }
 
         // if entries have been filtered out then try to skip reading of 
already deletedMessages in that range
         final Position nexReadPosition = entriesCount != filteredEntries.size()
@@ -87,6 +97,11 @@ class OpReadEntry implements ReadEntriesCallback {
         checkReadCompletion();
     }
 
+    @Override
+    public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+        internalReadEntriesComplete(returnedEntries, ctx, null);
+    }
+
     @Override
     public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
         cursor.readOperationCompleted();
@@ -190,6 +205,7 @@ class OpReadEntry implements ReadEntriesCallback {
         nextReadPosition = null;
         maxPosition = null;
         recyclerHandle.recycle(this);
+        skipCondition = null;
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(OpReadEntry.class);
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 9f1cd310f92..6d68b042a7a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -128,7 +128,7 @@ class OpScan implements ReadEntriesCallback {
         }
         if (cursor.hasMoreEntries(searchPosition)) {
             OpReadEntry opReadEntry = OpReadEntry.create(cursor, 
searchPosition, batchSize,
-            this, OpScan.this.ctx, null);
+            this, OpScan.this.ctx, null, null);
             ledger.asyncReadEntries(opReadEntry);
         } else {
             callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, 
OpScan.this.ctx);
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 54ebc4c293a..2c01b778caf 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -33,8 +33,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Predicate;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index fa3d6ec6c90..7e1731e573a 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -4126,7 +4127,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 
         // op readPosition is bigger than maxReadPosition
         OpReadEntry opReadEntry = OpReadEntry.create(cursor, 
ledger.lastConfirmedEntry, 10, callback,
-                null, PositionImpl.get(lastPosition.getLedgerId(), -1));
+                null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
         Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
         field.setAccessible(true);
         field.set(cursor, PositionImpl.EARLIEST);
@@ -4148,7 +4149,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         };
 
         @Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = 
Mockito.mockStatic(OpReadEntry.class);
-        mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), 
anyInt(), any(), any(), any()))
+        mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), 
anyInt(), any(), any(), any(), any()))
                 .thenAnswer(__ -> createOpReadEntry.get());
 
         final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
@@ -4252,5 +4253,67 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         factory2.shutdown();
     }
 
+    @Test
+    public void testReadEntriesWithFilterOut() throws ManagedLedgerException, 
InterruptedException, ExecutionException {
+        int readMaxNumber = 10;
+        int sendNumber = 20;
+        ManagedLedger ledger = factory.open("testReadEntriesWithFilter");
+        ManagedCursor cursor = ledger.openCursor("c");
+        Position position = PositionImpl.EARLIEST;
+        Position maxCanReadPosition = PositionImpl.EARLIEST;
+        for (int i = 0; i < sendNumber; i++) {
+            if (i == readMaxNumber - 1) {
+                position = ledger.addEntry(new byte[1024]);
+            } else if (i == sendNumber - 1) {
+                maxCanReadPosition = ledger.addEntry(new byte[1024]);
+            } else {
+                ledger.addEntry(new byte[1024]);
+            }
+
+        }
+        CompletableFuture<Integer> completableFuture = new 
CompletableFuture<>();
+        cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new 
ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                completableFuture.complete(entries.size());
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null, (PositionImpl) position, pos -> {
+            return pos.getEntryId() % 2 != 0;
+        });
+
+        int number = completableFuture.get();
+        assertEquals(number, readMaxNumber / 2);
+
+        assertEquals(cursor.getReadPosition().getEntryId(), 10);
+
+        CompletableFuture<Integer> completableFuture2 = new 
CompletableFuture<>();
+        cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new 
ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                completableFuture2.complete(entries.size());
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, 
Object ctx) {
+                completableFuture2.completeExceptionally(exception);
+            }
+        }, null, (PositionImpl) maxCanReadPosition, pos -> {
+            return pos.getEntryId() % 2 != 0;
+        });
+
+        int number2 = completableFuture2.get();
+        assertEquals(number2, readMaxNumber / 2);
+
+        assertEquals(cursor.getReadPosition().getEntryId(), 20);
+
+        cursor.close();
+        ledger.close();
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1338c714c73..3b011fe8d56 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -555,7 +555,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                     public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
                     }
-                }, null, maxPosition);
+                }, null, maxPosition, null);
         Assert.assertEquals(opReadEntry.readPosition, position);
     }
 
@@ -3030,7 +3030,7 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
                 responseException2.set(exception);
             }
 
-        }, null, PositionImpl.LATEST);
+        }, null, PositionImpl.LATEST, null);
         ledger.asyncReadEntry(ledgerHandle, 
PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
                 opReadEntry, ctxStr);
         retryStrategically((test) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9df30095131..9d07d4278fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Predicate;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -48,6 +49,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -318,8 +320,20 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
                     minReplayedPosition = null;
                 }
 
-                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this,
-                        ReadType.Normal, topic.getMaxReadPosition());
+                // Filter out and skip read delayed messages exist in 
DelayedDeliveryTracker
+                if (delayedDeliveryTracker.isPresent()) {
+                    Predicate<PositionImpl> skipCondition = null;
+                    final DelayedDeliveryTracker deliveryTracker = 
delayedDeliveryTracker.get();
+                    if (deliveryTracker instanceof 
BucketDelayedDeliveryTracker) {
+                        skipCondition = position -> deliveryTracker
+                                .containsMessage(position.getLedgerId(), 
position.getEntryId());
+                    }
+                    cursor.asyncReadEntriesWithSkipOrWait(messagesToRead, 
bytesToRead, this, ReadType.Normal,
+                            topic.getMaxReadPosition(), skipCondition);
+                } else {
+                    cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, 
this, ReadType.Normal,
+                            topic.getMaxReadPosition());
+                }
             } else {
                 log.debug("[{}] Cannot schedule next read until previous one 
is done", name);
             }

Reply via email to