poorbarcode commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1746447330


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +372,162 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            isDispatcherStuckOnReplays = true;
+            skipNextReplayToTriggerLookAhead = true;
             return true;
-        }  else if (currentThreadKeyNumber == 0) {
+        } else if (entriesByConsumerForDispatching.size() == 0) {
+            // There should be a backoff delay before retrying
             return true;
         }
         return false;
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? 
extends Position> entries,
-           int availablePermits, ReadType readType, Set<Integer> 
stickyKeyHashes) {
-        int maxMessages = Math.min(entries.size(), availablePermits);
-        if (maxMessages == 0) {
-            return 0;
+    private boolean isLookAheadAllowed() {
+        if 
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+                || (recentlyJoinedConsumers == null || 
recentlyJoinedConsumers.isEmpty())) {
+            long keySharedNumberOfReplayMessagesThresholdForLookAhead = 
Math.max(
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+                            * consumerList.size(),
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+            if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+                    || redeliveryMessages.size() < 
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+                return true;
+            }
         }
-        if (readType == ReadType.Normal && stickyKeyHashes != null
-                && 
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
-            // If redeliveryMessages contains messages that correspond to the 
same hash as the messages
-            // that the dispatcher is trying to send, do not send those 
messages for order guarantee
-            return 0;
+        return false;
+    }
+
+    // groups the entries by consumer and filters out the entries that should 
not be dispatched
+    // the entries are handled in the order they are received instead of first 
grouping them by consumer and
+    // then filtering them
+    private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+        // entries grouped by consumer
+        Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+        // permits for consumer, permits are for entries/batches
+        Map<Consumer, MutableInt> permitsForConsumer = new HashMap<>();
+
+        for (Entry entry : entries) {
+            int stickyKeyHash = getStickyKeyHash(entry);
+            Consumer consumer = selector.select(stickyKeyHash);
+            boolean dispatchEntry = false;
+            // a consumer was found for the sticky key hash and the entry can 
be dispatched
+            if (consumer != null && canDispatchEntry(consumer, entry, 
readType, stickyKeyHash)) {

Review Comment:
   > Addressing the consumer blocked issue: the logic has been changed to 
evaluate each message hash one-by-one. this is the code:
   
   - This change has a positive effect on a normal read.
   - **(Highlight)** But it has a negative effect on replay read, it also 
checks entries one by one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to