This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec4e3d0cf9 [fix][broker] Skip reading more entries for a pending read
with no more entries (#16400)
5ec4e3d0cf9 is described below
commit 5ec4e3d0cf977f7473cb4be1272699b11ad9e4a6
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 13 13:13:34 2022 +0800
[fix][broker] Skip reading more entries for a pending read with no more
entries (#16400)
### Motivation
Related issue: https://github.com/streamnative/kop/issues/1379
KoP uses reader on a single partition of a compacted topic and we
observed a lot of logs like:
> Error reading entries at 928511:1 : We can only have a single waiting
callback
It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns
false, `asyncReadEntriesOrWait` is called for multiple times before
`cancelPendingReadRequest` or new messages arrived.
### Modifications
Throw a `ConcurrentWaitCallbackException` instead of a raw
`ManagedLedgerException` when there are more wait callbacks. Then check
this exception type and skip the following steps in
`PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
---
.../org/apache/bookkeeper/mledger/ManagedLedgerException.java | 7 +++++++
.../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +--
.../persistent/PersistentDispatcherSingleActiveConsumer.java | 8 ++++++++
3 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 26cc9e12659..7046ba48193 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -182,6 +182,13 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class ConcurrentWaitCallbackException extends
ManagedLedgerException {
+
+ public ConcurrentWaitCallbackException() {
+ super("We can only have a single waiting callback");
+ }
+ }
+
@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dbeee2fd373..2e50c712614 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -858,8 +858,7 @@ public class ManagedCursorImpl implements ManagedCursor {
if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
- callback.readEntriesFailed(new ManagedLedgerException("We can
only have a single waiting callback"),
- ctx);
+ callback.readEntriesFailed(new
ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
return;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index a0af118f6e6..022a86ce9a0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -32,6 +32,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import
org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -352,6 +353,7 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name,
consumer, messagesToRead);
}
+
synchronized (this) {
havePendingRead = true;
if (consumer.readCompacted()) {
@@ -481,6 +483,12 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
Consumer c = readEntriesCtx.getConsumer();
readEntriesCtx.recycle();
+ if (exception instanceof ConcurrentWaitCallbackException) {
+ // At most one pending read request is allowed when there are no
more entries, we should not trigger more
+ // read operations in this case and just wait the existing read
operation completes.
+ return;
+ }
+
long waitTimeMillis = readFailureBackoff.next();
if (exception instanceof NoMoreEntriesToReadException) {