lhotari commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1746499955


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +372,162 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            isDispatcherStuckOnReplays = true;
+            skipNextReplayToTriggerLookAhead = true;
             return true;
-        }  else if (currentThreadKeyNumber == 0) {
+        } else if (entriesByConsumerForDispatching.size() == 0) {
+            // There should be a backoff delay before retrying
             return true;
         }
         return false;
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? 
extends Position> entries,
-           int availablePermits, ReadType readType, Set<Integer> 
stickyKeyHashes) {
-        int maxMessages = Math.min(entries.size(), availablePermits);
-        if (maxMessages == 0) {
-            return 0;
+    private boolean isLookAheadAllowed() {
+        if 
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+                || (recentlyJoinedConsumers == null || 
recentlyJoinedConsumers.isEmpty())) {
+            long keySharedNumberOfReplayMessagesThresholdForLookAhead = 
Math.max(
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+                            * consumerList.size(),
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+            if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+                    || redeliveryMessages.size() < 
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+                return true;
+            }
         }
-        if (readType == ReadType.Normal && stickyKeyHashes != null
-                && 
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
-            // If redeliveryMessages contains messages that correspond to the 
same hash as the messages
-            // that the dispatcher is trying to send, do not send those 
messages for order guarantee
-            return 0;
+        return false;
+    }
+
+    // groups the entries by consumer and filters out the entries that should 
not be dispatched
+    // the entries are handled in the order they are received instead of first 
grouping them by consumer and
+    // then filtering them
+    private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+        // entries grouped by consumer
+        Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+        // permits for consumer, permits are for entries/batches
+        Map<Consumer, MutableInt> permitsForConsumer = new HashMap<>();
+
+        for (Entry entry : entries) {
+            int stickyKeyHash = getStickyKeyHash(entry);
+            Consumer consumer = selector.select(stickyKeyHash);
+            boolean dispatchEntry = false;
+            // a consumer was found for the sticky key hash and the entry can 
be dispatched
+            if (consumer != null && canDispatchEntry(consumer, entry, 
readType, stickyKeyHash)) {

Review Comment:
   > But it has a negative effect on replay read, it also checks entries one by 
one.
   
   @poorbarcode what is the negative effect? Checking entries one-by-one by 
calling a separate method isn't inefficient in this case. The JVM's JIT 
compiler can optimize calls and run very efficiently. It would be a different 
matter if IO or locks would be involved. In those cases there's a benefit of 
batching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to