baomingyu commented on a change in pull request #10285:
URL: https://github.com/apache/pulsar/pull/10285#discussion_r643027360



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -322,17 +325,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer 
consumer, List<Entry> en
                 maxReadPosition = minReadPositionForRecentJoinedConsumer;
             }
         }
+
         // Here, the consumer is one that has recently joined, so we can only 
send messages that were
         // published before it has joined.
+        int messageNum = 0;
         for (int i = 0; i < maxMessages; i++) {
             if (((PositionImpl) 
entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
                 // We have already crossed the divider line. All messages in 
the list are now
                 // newer than what we can currently dispatch to this consumer
-                return i;
+                messageNum  = i;
+                break;
             }
         }
 
-        return maxMessages;
+        if (messageNum == 0) {
+            messageNum = maxMessages;
+            CopyOnWriteArrayList<Consumer> consumers = this.getConsumers();
+            for (Consumer consumer1 : consumers) {
+                if (isApendMessage(consumer1, entries.get(0).getLedgerId(),
+                        entries.get(0).getEntryId())) {
+                    messageNum = 0;
+                    break;
+                }
+            }
+        }
+        return messageNum;
+    }
+
+    private boolean isApendMessage(Consumer consuemr, long ledgerId, long 
entryId) {
+        if (consuemr != null && consuemr.getPendingAcks().get(ledgerId, 
entryId) != null) {

Review comment:
       This problem will occasionally occur when restarting the consumers which 
using key_shared mode.
   This problem has appeared many times in online clusters. After such 
modification, this problem has been solved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to