This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9ec1d071c71 [feat][broker][PIP-195]Implement Filter out all delayed
messages and skip them when reading messages from bookies - part7 (#19035)
9ec1d071c71 is described below
commit 9ec1d071c7188a2db694e9d7b359faaf33cb076e
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Jan 3 09:37:56 2023 +0800
[feat][broker][PIP-195]Implement Filter out all delayed messages and skip
them when reading messages from bookies - part7 (#19035)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 64 +++++++++++++++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 30 ++++++++--
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 33 +++++++++++
.../bookkeeper/mledger/impl/OpReadEntry.java | 28 +++++++--
.../org/apache/bookkeeper/mledger/impl/OpScan.java | 2 +-
.../mledger/impl/ManagedCursorContainerTest.java | 2 +-
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 67 +++++++++++++++++++++-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 4 +-
.../PersistentDispatcherMultipleConsumers.java | 18 +++++-
9 files changed, 230 insertions(+), 18 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 4167c040ed6..7802ed07781 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -167,6 +167,21 @@ public interface ManagedCursor {
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes,
ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);
+ /**
+ * Asynchronously read entries from the ManagedLedger.
+ *
+ * @param numberOfEntriesToRead maximum number of entries to return
+ * @param maxSizeBytes max size in bytes of the entries to return
+ * @param callback callback object
+ * @param ctx opaque context
+ * @param maxPosition max position can read
+ * @param skipCondition predicate of read filter out
+ */
+ default void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long
maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl
maxPosition, Predicate<PositionImpl> skipCondition) {
+ asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, callback, ctx,
maxPosition);
+ }
+
/**
* Get 'N'th entry from the mark delete position in the cursor without
updating any cursor positions.
*
@@ -264,6 +279,55 @@ public interface ManagedCursor {
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes,
ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);
+ /**
+ * Asynchronously read entries from the ManagedLedger, up to the specified
number and size.
+ *
+ * <p/>If no entries are available, the callback will not be triggered.
Instead it will be registered to wait until
+ * a new message will be persisted into the managed ledger
+ *
+ * @see #readEntriesOrWait(int, long)
+ * @param maxEntries
+ * maximum number of entries to return
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ * @param maxPosition
+ * max position can read
+ * @param skipCondition
+ * predicate of read filter out
+ */
+ default void asyncReadEntriesWithSkipOrWait(int maxEntries,
ReadEntriesCallback callback, Object ctx,
+ PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
+ asyncReadEntriesOrWait(maxEntries, callback, ctx, maxPosition);
+ }
+
+ /**
+ * Asynchronously read entries from the ManagedLedger, up to the specified
number and size.
+ *
+ * <p/>If no entries are available, the callback will not be triggered.
Instead it will be registered to wait until
+ * a new message will be persisted into the managed ledger
+ *
+ * @see #readEntriesOrWait(int, long)
+ * @param maxEntries
+ * maximum number of entries to return
+ * @param maxSizeBytes
+ * max size in bytes of the entries to return
+ * @param callback
+ * callback object
+ * @param ctx
+ * opaque context
+ * @param maxPosition
+ * max position can read
+ * @param skipCondition
+ * predicate of read filter out
+ */
+ default void asyncReadEntriesWithSkipOrWait(int maxEntries, long
maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl
maxPosition,
+ Predicate<PositionImpl>
skipCondition) {
+ asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx,
maxPosition);
+ }
+
/**
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
*
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 3af2e568d86..5b351c99649 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -762,13 +762,19 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void asyncReadEntries(final int numberOfEntriesToRead, final
ReadEntriesCallback callback,
- final Object ctx, PositionImpl maxPosition) {
+ final Object ctx, PositionImpl maxPosition) {
asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback,
ctx, maxPosition);
}
@Override
public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes,
ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
+ asyncReadEntriesWithSkip(numberOfEntriesToRead, maxSizeBytes,
callback, ctx, maxPosition, null);
+ }
+
+ @Override
+ public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long
maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException
@@ -779,7 +785,8 @@ public class ManagedCursorImpl implements ManagedCursor {
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead,
maxSizeBytes);
PENDING_READ_OPS_UPDATER.incrementAndGet(this);
- OpReadEntry op = OpReadEntry.create(this, readPosition,
numOfEntriesToRead, callback, ctx, maxPosition);
+ OpReadEntry op =
+ OpReadEntry.create(this, readPosition, numOfEntriesToRead,
callback, ctx, maxPosition, skipCondition);
ledger.asyncReadEntries(op);
}
@@ -901,6 +908,20 @@ public class ManagedCursorImpl implements ManagedCursor {
@Override
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes,
ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
+ asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback,
ctx, maxPosition, null);
+ }
+
+ @Override
+ public void asyncReadEntriesWithSkipOrWait(int maxEntries,
ReadEntriesCallback callback,
+ Object ctx, PositionImpl
maxPosition,
+ Predicate<PositionImpl>
skipCondition) {
+ asyncReadEntriesWithSkipOrWait(maxEntries, NO_MAX_SIZE_LIMIT,
callback, ctx, maxPosition, skipCondition);
+ }
+
+ @Override
+ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long
maxSizeBytes, ReadEntriesCallback callback,
+ Object ctx, PositionImpl
maxPosition,
+ Predicate<PositionImpl>
skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new
CursorAlreadyClosedException("Cursor was already closed"), ctx);
@@ -914,10 +935,11 @@ public class ManagedCursorImpl implements ManagedCursor {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately",
ledger.getName(), name);
}
- asyncReadEntries(numberOfEntriesToRead, callback, ctx,
maxPosition);
+ asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT,
callback, ctx,
+ maxPosition, skipCondition);
} else {
OpReadEntry op = OpReadEntry.create(this, readPosition,
numberOfEntriesToRead, callback,
- ctx, maxPosition);
+ ctx, maxPosition, skipCondition);
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 290e4bb530f..3a80d122272 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2054,6 +2054,39 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
long lastEntry = min(firstEntry +
opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
+ // Filer out and skip unnecessary read entry
+ if (opReadEntry.skipCondition != null) {
+ long firstValidEntry = -1L;
+ long lastValidEntry = -1L;
+ long entryId = firstEntry;
+ for (; entryId <= lastEntry; entryId++) {
+ if
(opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+ if (firstValidEntry == -1L) {
+ firstValidEntry = entryId;
+ }
+ } else {
+ if (firstValidEntry != -1L) {
+ break;
+ }
+ }
+
+ if (firstValidEntry != -1L) {
+ lastValidEntry = entryId;
+ }
+ }
+
+ // If all messages in [firstEntry...lastEntry] are filter out,
+ // then manual call internalReadEntriesComplete to advance read
position.
+ if (firstValidEntry == -1L) {
+
opReadEntry.internalReadEntriesComplete(Collections.emptyList(),
opReadEntry.ctx,
+ PositionImpl.get(ledger.getId(), lastEntry));
+ return;
+ }
+
+ firstEntry = firstValidEntry;
+ lastEntry = lastValidEntry;
+ }
+
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={}
last={}", name, ledger.getId(), firstEntry,
lastEntry);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 86290d8bc8f..04437b5320d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -22,7 +22,9 @@ import static
org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -45,8 +47,10 @@ class OpReadEntry implements ReadEntriesCallback {
private PositionImpl nextReadPosition;
PositionImpl maxPosition;
+ Predicate<PositionImpl> skipCondition;
+
public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl
readPositionRef, int count,
- ReadEntriesCallback callback, Object ctx, PositionImpl
maxPosition) {
+ ReadEntriesCallback callback, Object ctx, PositionImpl
maxPosition, Predicate<PositionImpl> skipCondition) {
OpReadEntry op = RECYCLER.get();
op.readPosition =
cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
@@ -57,13 +61,13 @@ class OpReadEntry implements ReadEntriesCallback {
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
+ op.skipCondition = skipCondition;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
}
- @Override
- public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+ void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx,
PositionImpl lastPosition) {
// Filter the returned entries for individual deleted messages
int entriesCount = returnedEntries.size();
long entriesSize = 0;
@@ -72,13 +76,19 @@ class OpReadEntry implements ReadEntriesCallback {
}
cursor.updateReadStats(entriesCount, entriesSize);
- final PositionImpl lastPosition = (PositionImpl)
returnedEntries.get(entriesCount - 1).getPosition();
+ if (entriesCount != 0) {
+ lastPosition = (PositionImpl) returnedEntries.get(entriesCount -
1).getPosition();
+ }
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Read entries succeeded batch_size={}
cumulative_size={} requested_count={}",
cursor.ledger.getName(), cursor.getName(),
returnedEntries.size(), entries.size(), count);
}
- List<Entry> filteredEntries =
cursor.filterReadEntries(returnedEntries);
- entries.addAll(filteredEntries);
+
+ List<Entry> filteredEntries = Collections.emptyList();
+ if (entriesCount != 0) {
+ filteredEntries = cursor.filterReadEntries(returnedEntries);
+ entries.addAll(filteredEntries);
+ }
// if entries have been filtered out then try to skip reading of
already deletedMessages in that range
final Position nexReadPosition = entriesCount != filteredEntries.size()
@@ -87,6 +97,11 @@ class OpReadEntry implements ReadEntriesCallback {
checkReadCompletion();
}
+ @Override
+ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+ internalReadEntriesComplete(returnedEntries, ctx, null);
+ }
+
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object
ctx) {
cursor.readOperationCompleted();
@@ -190,6 +205,7 @@ class OpReadEntry implements ReadEntriesCallback {
nextReadPosition = null;
maxPosition = null;
recyclerHandle.recycle(this);
+ skipCondition = null;
}
private static final Logger log =
LoggerFactory.getLogger(OpReadEntry.class);
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 9f1cd310f92..6d68b042a7a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -128,7 +128,7 @@ class OpScan implements ReadEntriesCallback {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor,
searchPosition, batchSize,
- this, OpScan.this.ctx, null);
+ this, OpScan.this.ctx, null, null);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED,
OpScan.this.ctx);
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 54ebc4c293a..2c01b778caf 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -33,8 +33,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Predicate;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index fa3d6ec6c90..7e1731e573a 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -4126,7 +4127,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor,
ledger.lastConfirmedEntry, 10, callback,
- null, PositionImpl.get(lastPosition.getLedgerId(), -1));
+ null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionImpl.EARLIEST);
@@ -4148,7 +4149,7 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
};
@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry =
Mockito.mockStatic(OpReadEntry.class);
- mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(),
anyInt(), any(), any(), any()))
+ mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(),
anyInt(), any(), any(), any(), any()))
.thenAnswer(__ -> createOpReadEntry.get());
final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
@@ -4252,5 +4253,67 @@ public class ManagedCursorTest extends
MockedBookKeeperTestCase {
factory2.shutdown();
}
+ @Test
+ public void testReadEntriesWithFilterOut() throws ManagedLedgerException,
InterruptedException, ExecutionException {
+ int readMaxNumber = 10;
+ int sendNumber = 20;
+ ManagedLedger ledger = factory.open("testReadEntriesWithFilter");
+ ManagedCursor cursor = ledger.openCursor("c");
+ Position position = PositionImpl.EARLIEST;
+ Position maxCanReadPosition = PositionImpl.EARLIEST;
+ for (int i = 0; i < sendNumber; i++) {
+ if (i == readMaxNumber - 1) {
+ position = ledger.addEntry(new byte[1024]);
+ } else if (i == sendNumber - 1) {
+ maxCanReadPosition = ledger.addEntry(new byte[1024]);
+ } else {
+ ledger.addEntry(new byte[1024]);
+ }
+
+ }
+ CompletableFuture<Integer> completableFuture = new
CompletableFuture<>();
+ cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new
ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ completableFuture.complete(entries.size());
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ completableFuture.completeExceptionally(exception);
+ }
+ }, null, (PositionImpl) position, pos -> {
+ return pos.getEntryId() % 2 != 0;
+ });
+
+ int number = completableFuture.get();
+ assertEquals(number, readMaxNumber / 2);
+
+ assertEquals(cursor.getReadPosition().getEntryId(), 10);
+
+ CompletableFuture<Integer> completableFuture2 = new
CompletableFuture<>();
+ cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new
ReadEntriesCallback() {
+ @Override
+ public void readEntriesComplete(List<Entry> entries, Object ctx) {
+ completableFuture2.complete(entries.size());
+ }
+
+ @Override
+ public void readEntriesFailed(ManagedLedgerException exception,
Object ctx) {
+ completableFuture2.completeExceptionally(exception);
+ }
+ }, null, (PositionImpl) maxCanReadPosition, pos -> {
+ return pos.getEntryId() % 2 != 0;
+ });
+
+ int number2 = completableFuture2.get();
+ assertEquals(number2, readMaxNumber / 2);
+
+ assertEquals(cursor.getReadPosition().getEntryId(), 20);
+
+ cursor.close();
+ ledger.close();
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1338c714c73..3b011fe8d56 100644
---
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -555,7 +555,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
public void readEntriesFailed(ManagedLedgerException
exception, Object ctx) {
}
- }, null, maxPosition);
+ }, null, maxPosition, null);
Assert.assertEquals(opReadEntry.readPosition, position);
}
@@ -3030,7 +3030,7 @@ public class ManagedLedgerTest extends
MockedBookKeeperTestCase {
responseException2.set(exception);
}
- }, null, PositionImpl.LATEST);
+ }, null, PositionImpl.LATEST, null);
ledger.asyncReadEntry(ledgerHandle,
PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
opReadEntry, ctxStr);
retryStrategically((test) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9df30095131..9d07d4278fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -48,6 +49,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
+import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -318,8 +320,20 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
minReplayedPosition = null;
}
- cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this,
- ReadType.Normal, topic.getMaxReadPosition());
+ // Filter out and skip read delayed messages exist in
DelayedDeliveryTracker
+ if (delayedDeliveryTracker.isPresent()) {
+ Predicate<PositionImpl> skipCondition = null;
+ final DelayedDeliveryTracker deliveryTracker =
delayedDeliveryTracker.get();
+ if (deliveryTracker instanceof
BucketDelayedDeliveryTracker) {
+ skipCondition = position -> deliveryTracker
+ .containsMessage(position.getLedgerId(),
position.getEntryId());
+ }
+ cursor.asyncReadEntriesWithSkipOrWait(messagesToRead,
bytesToRead, this, ReadType.Normal,
+ topic.getMaxReadPosition(), skipCondition);
+ } else {
+ cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead,
this, ReadType.Normal,
+ topic.getMaxReadPosition());
+ }
} else {
log.debug("[{}] Cannot schedule next read until previous one
is done", name);
}