This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b9504f460d944cc7ee2a4ad8cfbbe50102901228 Author: Yunze Xu <[email protected]> AuthorDate: Wed Jul 13 13:13:34 2022 +0800 [fix][broker] Skip reading more entries for a pending read with no more entries (#16400) ### Motivation Related issue: https://github.com/streamnative/kop/issues/1379 KoP uses reader on a single partition of a compacted topic and we observed a lot of logs like: > Error reading entries at 928511:1 : We can only have a single waiting callback It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns false, `asyncReadEntriesOrWait` is called for multiple times before `cancelPendingReadRequest` or new messages arrived. ### Modifications Throw a `ConcurrentWaitCallbackException` instead of a raw `ManagedLedgerException` when there are more wait callbacks. Then check this exception type and skip the following steps in `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`. (cherry picked from commit 5ec4e3d0cf977f7473cb4be1272699b11ad9e4a6) --- .../org/apache/bookkeeper/mledger/ManagedLedgerException.java | 7 +++++++ .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +-- .../persistent/PersistentDispatcherSingleActiveConsumer.java | 8 ++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 26cc9e12659..7046ba48193 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -182,6 +182,13 @@ public class ManagedLedgerException extends Exception { } } + public static class ConcurrentWaitCallbackException extends ManagedLedgerException { + + public ConcurrentWaitCallbackException() { + super("We can only have a single waiting callback"); + } + } + @Override public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 87850624751..863f1cc9fd5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -859,8 +859,7 @@ public class ManagedCursorImpl implements ManagedCursor { if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { op.recycle(); - callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"), - ctx); + callback.readEntriesFailed(new ManagedLedgerException.ConcurrentWaitCallbackException(), ctx); return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 1569037b9e3..79f397af883 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -352,6 +353,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher if (log.isDebugEnabled()) { log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead); } + synchronized (this) { havePendingRead = true; if (consumer.readCompacted()) { @@ -481,6 +483,12 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher Consumer c = readEntriesCtx.getConsumer(); readEntriesCtx.recycle(); + if (exception instanceof ConcurrentWaitCallbackException) { + // At most one pending read request is allowed when there are no more entries, we should not trigger more + // read operations in this case and just wait the existing read operation completes. + return; + } + long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof NoMoreEntriesToReadException) {
