baomingyu commented on a change in pull request #10285:
URL: https://github.com/apache/pulsar/pull/10285#discussion_r643025169



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -322,17 +325,38 @@ private int getRestrictedMaxEntriesForConsumer(Consumer 
consumer, List<Entry> en
                 maxReadPosition = minReadPositionForRecentJoinedConsumer;
             }
         }
+
         // Here, the consumer is one that has recently joined, so we can only 
send messages that were
         // published before it has joined.
+        int messageNum = 0;
         for (int i = 0; i < maxMessages; i++) {
             if (((PositionImpl) 
entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
                 // We have already crossed the divider line. All messages in 
the list are now
                 // newer than what we can currently dispatch to this consumer
-                return i;
+                messageNum  = i;
+                break;
             }
         }
 
-        return maxMessages;
+        if (messageNum == 0) {
+            messageNum = maxMessages;
+            CopyOnWriteArrayList<Consumer> consumers = this.getConsumers();
+            for (Consumer consumer1 : consumers) {
+                if (isApendMessage(consumer1, entries.get(0).getLedgerId(),
+                        entries.get(0).getEntryId())) {
+                    messageNum = 0;
+                    break;
+                }
+            }
+        }
+        return messageNum;
+    }
+
+    private boolean isApendMessage(Consumer consuemr, long ledgerId, long 
entryId) {
+        if (consuemr != null && consuemr.getPendingAcks().get(ledgerId, 
entryId) != null) {

Review comment:
       > @baomingyu Seems the description of this PR is not proper to explain 
how to fix this problem, could you please provide more context about why the 
`getRestrictedMaxEntriesForConsumer` will always return 0 after dispatch 
messages to a consumer that without any flow permits?
   
   This problem is mainly caused by the time difference between creating a 
consumer and having permits for this consumer, which are not processed at the 
same time. 
   For example, during dispatching messages at first time, there are consumer A 
and consumer B in the consumer list, consumer A's permit is 1000, and consumer 
B has not received the flow command yet, and the permit is 0. At this time, use 
key_shared mode to send messages. If these messages are both It is assigned to 
consumer B by key. At this time, the message will not be actually pushed to 
consumer B. In the next round of push, consumer B has received the flow command 
and the permit has changed to 1000. If you don’t modify  code , just compare 
the position where has been pushed last time. It will not push messages to 
consumer B actually. The phenomenon is that the broker has a message, but it is 
not pushed to the consumer.
   
   The method of modification is:
   When the getRestrictedMaxEntriesForConsumer method goes to the back 
messageNum == 0, it need to be  judged that the current message has been 
successfully pushed to the consumer. If it has not been pushed to any consumer, 
it needs to be pushed again.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to