This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ec4e3d0cf9 [fix][broker] Skip reading more entries for a pending read 
with no more entries (#16400)
5ec4e3d0cf9 is described below

commit 5ec4e3d0cf977f7473cb4be1272699b11ad9e4a6
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 13 13:13:34 2022 +0800

    [fix][broker] Skip reading more entries for a pending read with no more 
entries (#16400)
    
    ### Motivation
    
    Related issue: https://github.com/streamnative/kop/issues/1379
    
    KoP uses reader on a single partition of a compacted topic and we
    observed a lot of logs like:
    
    > Error reading entries at 928511:1 : We can only have a single waiting 
callback
    
    It happened on a `ManagedCursorImpl` when `hasMoreEntries` returns
    false, `asyncReadEntriesOrWait` is called for multiple times before
    `cancelPendingReadRequest` or new messages arrived.
    
    ### Modifications
    
    Throw a `ConcurrentWaitCallbackException` instead of a raw
    `ManagedLedgerException` when there are more wait callbacks. Then check
    this exception type and skip the following steps in
    `PersistentDispatcherSingleActiveConsumer#internalReadEntriesFailed`.
---
 .../org/apache/bookkeeper/mledger/ManagedLedgerException.java     | 7 +++++++
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java     | 3 +--
 .../persistent/PersistentDispatcherSingleActiveConsumer.java      | 8 ++++++++
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 26cc9e12659..7046ba48193 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -182,6 +182,13 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class ConcurrentWaitCallbackException extends 
ManagedLedgerException {
+
+        public ConcurrentWaitCallbackException() {
+            super("We can only have a single waiting callback");
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dbeee2fd373..2e50c712614 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -858,8 +858,7 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                 op.recycle();
-                callback.readEntriesFailed(new ManagedLedgerException("We can 
only have a single waiting callback"),
-                        ctx);
+                callback.readEntriesFailed(new 
ManagedLedgerException.ConcurrentWaitCallbackException(), ctx);
                 return;
             }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index a0af118f6e6..022a86ce9a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -32,6 +32,7 @@ import 
org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import 
org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -352,6 +353,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] Schedule read of {} messages", name, 
consumer, messagesToRead);
             }
+
             synchronized (this) {
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
@@ -481,6 +483,12 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         Consumer c = readEntriesCtx.getConsumer();
         readEntriesCtx.recycle();
 
+        if (exception instanceof ConcurrentWaitCallbackException) {
+            // At most one pending read request is allowed when there are no 
more entries, we should not trigger more
+            // read operations in this case and just wait the existing read 
operation completes.
+            return;
+        }
+
         long waitTimeMillis = readFailureBackoff.next();
 
         if (exception instanceof NoMoreEntriesToReadException) {

Reply via email to