This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f154e  Fix 8115 Some partitions get stuck after adding additional 
consumers to the KEY_SHARED subscriptions (#10096)
c4f154e is described below

commit c4f154e79c03cff9055aa4e2ede7748c5952f2bc
Author: baomingyu <[email protected]>
AuthorDate: Thu Apr 8 09:51:39 2021 +0800

    Fix 8115 Some partitions get stuck after adding additional consumers to the 
KEY_SHARED subscriptions (#10096)
    
    Fixes #8115
    
    Master Issue: #8115
    
    ### Motivation
    
    first point:
     Sometimes it will not success to call this method and the method 
readMoreEntries will not be called
    ` if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                            readMoreEntries();
     } `
    
    second point:
      Sometimes  keyNumbers will not be decrement to zero , and broker will not 
be start next  loop to readMoreEntries.
    some partition topic will be stunk and stop to push message to consumer 
,even though  there is permits in consumers.
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3107f13..d27df30 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -171,6 +172,10 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
 
+        int currentThreadKeyNumber = groupedEntries.size();
+        if (currentThreadKeyNumber == 0) {
+            currentThreadKeyNumber = -1;
+        }
         for (Map.Entry<Consumer, List<Entry>> current : 
groupedEntries.entrySet()) {
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
@@ -214,7 +219,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                         sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), 
sendMessageInfo.getTotalChunkedMessages(),
                         getRedeliveryTracker()).addListener(future -> {
-                    if (future.isSuccess() && keyNumbers.decrementAndGet() == 
0) {
+                    if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
                         readMoreEntries();
                     }
                 });
@@ -223,6 +228,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                         -(sendMessageInfo.getTotalMessages() - 
batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
+            } else {
+                currentThreadKeyNumber = keyNumbers.decrementAndGet();
             }
         }
 
@@ -260,6 +267,12 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             // readMoreEntries should run regardless whether or not stuck is 
caused by
             // stuckConsumers for avoid stopping dispatch.
             readMoreEntries();
+        }  else if (currentThreadKeyNumber == 0) {
+            topic.getBrokerService().executor().schedule(() -> {
+                synchronized 
(PersistentStickyKeyDispatcherMultipleConsumers.this) {
+                    readMoreEntries();
+                }
+            }, 100, TimeUnit.MILLISECONDS);
         }
     }
 

Reply via email to