merlimat commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1737972245


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +373,159 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            isDispatcherStuckOnReplays = true;
+            skipNextReplayToTriggerLookAhead = true;
             return true;
-        }  else if (currentThreadKeyNumber == 0) {
+        } else if (entriesByConsumerForDispatching.size() == 0) {
+            // There should be a backoff delay before retrying
             return true;
         }
         return false;
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? 
extends Position> entries,
-           int availablePermits, ReadType readType, Set<Integer> 
stickyKeyHashes) {
-        int maxMessages = Math.min(entries.size(), availablePermits);
-        if (maxMessages == 0) {
-            return 0;
+    private boolean isLookAheadAllowed() {
+        if 
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+                || (recentlyJoinedConsumers == null || 
recentlyJoinedConsumers.isEmpty())) {
+            long keySharedNumberOfReplayMessagesThresholdForLookAhead = 
Math.max(
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+                            * consumerList.size(),
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+            if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+                    || redeliveryMessages.size() < 
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+                return true;
+            }
         }
-        if (readType == ReadType.Normal && stickyKeyHashes != null
-                && 
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
-            // If redeliveryMessages contains messages that correspond to the 
same hash as the messages
-            // that the dispatcher is trying to send, do not send those 
messages for order guarantee
-            return 0;
+        return false;
+    }
+
+    // groups the entries by consumer and filters out the entries that should 
not be dispatched
+    // the entries are handled in the order they are received instead of first 
grouping them by consumer and
+    // then filtering them
+    private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+        // entries grouped by consumer
+        Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+        // consumers for which all remaining entries should be discarded
+        Set<Consumer> remainingEntriesFilteredForConsumer = new HashSet<>();
+
+        for (Entry entry : entries) {
+            int stickyKeyHash = getStickyKeyHash(entry);
+            Consumer consumer = selector.select(stickyKeyHash);
+            boolean dispatchEntry = false;
+            // a consumer was found for the sticky key hash and the consumer 
can get more entries
+            if (consumer != null && 
!remainingEntriesFilteredForConsumer.contains(consumer)) {
+                dispatchEntry = canDispatchEntry(consumer, entry, readType, 
stickyKeyHash);
+            }
+            if (dispatchEntry) {
+                // add the entry to consumer's entry list for dispatching
+                List<Entry> consumerEntries =
+                        entriesGroupedByConsumer.computeIfAbsent(consumer, k 
-> new ArrayList<>());
+                consumerEntries.add(entry);
+            } else {
+                // add the message to replay
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
+                // release the entry as it will not be dispatched
+                entry.release();
+                // stop sending further entries to this consumer so that 
ordering is preserved
+                remainingEntriesFilteredForConsumer.add(consumer);
+            }
+        }
+        return entriesGroupedByConsumer;
+    }
+
+    // checks if the entry can be dispatched to the consumer
+    private boolean canDispatchEntry(Consumer consumer, Entry entry, ReadType 
readType, int stickyKeyHash) {
+        // check if the entry can be replayed to a recently joined consumer
+        Position maxLastSentPosition = 
resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType);
+        if (maxLastSentPosition != null && 
entry.getPosition().compareTo(maxLastSentPosition) > 0) {
+            return false;
+        }
+
+        // If redeliveryMessages contains messages that correspond to the same 
hash as the entry to be dispatched
+        // do not send those messages for order guarantee
+        if (readType == ReadType.Normal && 
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
+            return false;
         }
+
+        return true;
+    }
+
+    /**
+     * Creates a filter for replaying messages. The filter is stateful and 
shouldn't be cached or reused.
+     * @see PersistentDispatcherMultipleConsumers#createFilterForReplay()
+     */
+    @Override
+    protected Predicate<Position> createFilterForReplay() {
+        return new ReplayPositionFilter();
+    }
+
+    /**
+     * Filter for replaying messages. The filter is stateful for a single 
invocation and shouldn't be cached, shared
+     * or reused. This is a short-lived object, and optimizing it for the "no 
garbage" coding style of Pulsar is
+     * unnecessary since the JVM can optimize allocations for short-lived 
objects.
+     */
+    private class ReplayPositionFilter implements Predicate<Position> {
+        // tracks the available permits for each consumer for the duration of 
the filter usage
+        // the filter is stateful and shouldn't be shared or reused later
+        private final Map<Consumer, MutableInt> availablePermitsMap = new 
HashMap<>();
+
+        @Override
+        public boolean test(Position position) {
+            // if out of order delivery is allowed, then any position will be 
replayed
+            if (isAllowOutOfOrderDelivery()) {
+                return true;
+            }
+            // lookup the sticky key hash for the entry at the replay position
+            Long stickyKeyHash = 
redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
+            if (stickyKeyHash == null) {
+                // the sticky key hash is missing for delayed messages, the 
filtering will happen at the time of
+                // dispatch after reading the entry from the ledger
+                log.debug("[{}] replay of entry at position {} doesn't contain 
sticky key hash.", name, position);

Review Comment:
   nit: 
   
   ```suggestion
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] replay of entry at position {} doesn't 
contain sticky key hash.", name, position);
                   }
   ```



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -949,6 +949,48 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
             + " back and unack count reaches to `limit/2`. Using a value of 0, 
is disabling unackedMessage-limit"
             + " check and broker doesn't block dispatchers")
     private int maxUnackedMessagesPerBroker = 0;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "For Key_Shared subscriptions, if messages cannot be 
dispatched to consumers due to a slow consumer"
+                    + " or a blocked key hash (because of ordering 
constraints), the broker will continue reading more"
+                    + " messages from the backlog and attempt to dispatch them 
to consumers until the number of replay"
+                    + " messages reaches the calculated threshold.\n"
+                    + "Formula: threshold = 
max(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+                    + " connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+                    + ".\n"
+                    + "Setting this value to 0 will disable the limit 
calculated per consumer.",
+            dynamic = true
+    )
+    private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 1000;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "For Key_Shared subscriptions, if messages cannot be 
dispatched to consumers due to a slow consumer"
+                    + " or a blocked key hash (because of ordering 
constraints), the broker will continue reading more"
+                    + " messages from the backlog and attempt to dispatch them 
to consumers until the number of replay"
+                    + " messages reaches the calculated threshold.\n"
+                    + "Formula: threshold = 
max(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+                    + " connected consumer count, 
keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+                    + ".\n"
+                    + "This value should be set to a value less than 2 * 
managedLedgerMaxUnackedRangesToPersist.\n"
+                    + "Setting this value to 0 will disable the limit 
calculated per subscription.\n",
+            dynamic = true
+    )
+    private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 10000;
+
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "For Key_Shared subscriptions, if messages cannot be 
dispatched to consumers due to a slow consumer"
+                    + " or a blocked key hash (because of ordering 
constraints), the broker will continue reading more"
+                    + " messages from the backlog and attempt to dispatch them 
to consumers until the number of replay"
+                    + " messages reaches the calculated threshold.\n"
+                    + "This setting controls whether look ahead is enabled 
when recently joined consumers are present.",
+            dynamic = true
+    )
+    private boolean keySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist 
= false;

Review Comment:
   Is this going to change existing behavior, with the default setting?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +373,159 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            isDispatcherStuckOnReplays = true;
+            skipNextReplayToTriggerLookAhead = true;
             return true;
-        }  else if (currentThreadKeyNumber == 0) {
+        } else if (entriesByConsumerForDispatching.size() == 0) {
+            // There should be a backoff delay before retrying
             return true;
         }
         return false;
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? 
extends Position> entries,
-           int availablePermits, ReadType readType, Set<Integer> 
stickyKeyHashes) {
-        int maxMessages = Math.min(entries.size(), availablePermits);
-        if (maxMessages == 0) {
-            return 0;
+    private boolean isLookAheadAllowed() {
+        if 
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+                || (recentlyJoinedConsumers == null || 
recentlyJoinedConsumers.isEmpty())) {
+            long keySharedNumberOfReplayMessagesThresholdForLookAhead = 
Math.max(
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+                            * consumerList.size(),
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+            if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+                    || redeliveryMessages.size() < 
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+                return true;
+            }
         }
-        if (readType == ReadType.Normal && stickyKeyHashes != null
-                && 
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
-            // If redeliveryMessages contains messages that correspond to the 
same hash as the messages
-            // that the dispatcher is trying to send, do not send those 
messages for order guarantee
-            return 0;
+        return false;
+    }
+
+    // groups the entries by consumer and filters out the entries that should 
not be dispatched
+    // the entries are handled in the order they are received instead of first 
grouping them by consumer and
+    // then filtering them
+    private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+        // entries grouped by consumer
+        Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+        // consumers for which all remaining entries should be discarded
+        Set<Consumer> remainingEntriesFilteredForConsumer = new HashSet<>();
+
+        for (Entry entry : entries) {
+            int stickyKeyHash = getStickyKeyHash(entry);
+            Consumer consumer = selector.select(stickyKeyHash);
+            boolean dispatchEntry = false;
+            // a consumer was found for the sticky key hash and the consumer 
can get more entries
+            if (consumer != null && 
!remainingEntriesFilteredForConsumer.contains(consumer)) {
+                dispatchEntry = canDispatchEntry(consumer, entry, readType, 
stickyKeyHash);
+            }
+            if (dispatchEntry) {
+                // add the entry to consumer's entry list for dispatching
+                List<Entry> consumerEntries =
+                        entriesGroupedByConsumer.computeIfAbsent(consumer, k 
-> new ArrayList<>());
+                consumerEntries.add(entry);
+            } else {
+                // add the message to replay
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
+                // release the entry as it will not be dispatched
+                entry.release();
+                // stop sending further entries to this consumer so that 
ordering is preserved
+                remainingEntriesFilteredForConsumer.add(consumer);
+            }
+        }
+        return entriesGroupedByConsumer;
+    }
+
+    // checks if the entry can be dispatched to the consumer
+    private boolean canDispatchEntry(Consumer consumer, Entry entry, ReadType 
readType, int stickyKeyHash) {
+        // check if the entry can be replayed to a recently joined consumer
+        Position maxLastSentPosition = 
resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType);
+        if (maxLastSentPosition != null && 
entry.getPosition().compareTo(maxLastSentPosition) > 0) {
+            return false;
+        }
+
+        // If redeliveryMessages contains messages that correspond to the same 
hash as the entry to be dispatched
+        // do not send those messages for order guarantee
+        if (readType == ReadType.Normal && 
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
+            return false;
         }
+
+        return true;
+    }
+
+    /**
+     * Creates a filter for replaying messages. The filter is stateful and 
shouldn't be cached or reused.
+     * @see PersistentDispatcherMultipleConsumers#createFilterForReplay()
+     */
+    @Override
+    protected Predicate<Position> createFilterForReplay() {
+        return new ReplayPositionFilter();
+    }
+
+    /**
+     * Filter for replaying messages. The filter is stateful for a single 
invocation and shouldn't be cached, shared
+     * or reused. This is a short-lived object, and optimizing it for the "no 
garbage" coding style of Pulsar is
+     * unnecessary since the JVM can optimize allocations for short-lived 
objects.
+     */
+    private class ReplayPositionFilter implements Predicate<Position> {
+        // tracks the available permits for each consumer for the duration of 
the filter usage
+        // the filter is stateful and shouldn't be shared or reused later
+        private final Map<Consumer, MutableInt> availablePermitsMap = new 
HashMap<>();
+
+        @Override
+        public boolean test(Position position) {
+            // if out of order delivery is allowed, then any position will be 
replayed
+            if (isAllowOutOfOrderDelivery()) {
+                return true;
+            }
+            // lookup the sticky key hash for the entry at the replay position
+            Long stickyKeyHash = 
redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
+            if (stickyKeyHash == null) {
+                // the sticky key hash is missing for delayed messages, the 
filtering will happen at the time of
+                // dispatch after reading the entry from the ledger
+                log.debug("[{}] replay of entry at position {} doesn't contain 
sticky key hash.", name, position);
+                return true;
+            }
+            // find the consumer for the sticky key hash
+            Consumer consumer = selector.select(stickyKeyHash.intValue());
+            // skip replaying the message position if there's no assigned 
consumer
+            if (consumer == null) {
+                return false;
+            }
+            // lookup the available permits for the consumer
+            MutableInt availablePermits =
+                    availablePermitsMap.computeIfAbsent(consumer,
+                            k -> new 
MutableInt(getAvailablePermits(consumer)));
+            // skip replaying the message position if the consumer has no 
available permits
+            if (availablePermits.intValue() <= 0) {
+                return false;

Review Comment:
   If we stop here because 1 consumer has no permit, wouldn't it stop the 
replay for everyone? Or is the context for a single consumer?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +373,159 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            isDispatcherStuckOnReplays = true;
+            skipNextReplayToTriggerLookAhead = true;
             return true;
-        }  else if (currentThreadKeyNumber == 0) {
+        } else if (entriesByConsumerForDispatching.size() == 0) {
+            // There should be a backoff delay before retrying
             return true;
         }
         return false;
     }
 
-    private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<? 
extends Position> entries,
-           int availablePermits, ReadType readType, Set<Integer> 
stickyKeyHashes) {
-        int maxMessages = Math.min(entries.size(), availablePermits);
-        if (maxMessages == 0) {
-            return 0;
+    private boolean isLookAheadAllowed() {
+        if 
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+                || (recentlyJoinedConsumers == null || 
recentlyJoinedConsumers.isEmpty())) {
+            long keySharedNumberOfReplayMessagesThresholdForLookAhead = 
Math.max(
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+                            * consumerList.size(),
+                    
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+            if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+                    || redeliveryMessages.size() < 
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+                return true;
+            }
         }
-        if (readType == ReadType.Normal && stickyKeyHashes != null
-                && 
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
-            // If redeliveryMessages contains messages that correspond to the 
same hash as the messages
-            // that the dispatcher is trying to send, do not send those 
messages for order guarantee
-            return 0;
+        return false;
+    }
+
+    // groups the entries by consumer and filters out the entries that should 
not be dispatched
+    // the entries are handled in the order they are received instead of first 
grouping them by consumer and
+    // then filtering them
+    private Map<Consumer, List<Entry>> 
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+        // entries grouped by consumer
+        Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+        // consumers for which all remaining entries should be discarded
+        Set<Consumer> remainingEntriesFilteredForConsumer = new HashSet<>();
+
+        for (Entry entry : entries) {
+            int stickyKeyHash = getStickyKeyHash(entry);
+            Consumer consumer = selector.select(stickyKeyHash);
+            boolean dispatchEntry = false;
+            // a consumer was found for the sticky key hash and the consumer 
can get more entries
+            if (consumer != null && 
!remainingEntriesFilteredForConsumer.contains(consumer)) {
+                dispatchEntry = canDispatchEntry(consumer, entry, readType, 
stickyKeyHash);
+            }
+            if (dispatchEntry) {
+                // add the entry to consumer's entry list for dispatching
+                List<Entry> consumerEntries =
+                        entriesGroupedByConsumer.computeIfAbsent(consumer, k 
-> new ArrayList<>());
+                consumerEntries.add(entry);
+            } else {
+                // add the message to replay
+                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), 
stickyKeyHash);
+                // release the entry as it will not be dispatched
+                entry.release();
+                // stop sending further entries to this consumer so that 
ordering is preserved
+                remainingEntriesFilteredForConsumer.add(consumer);
+            }
+        }
+        return entriesGroupedByConsumer;
+    }
+
+    // checks if the entry can be dispatched to the consumer
+    private boolean canDispatchEntry(Consumer consumer, Entry entry, ReadType 
readType, int stickyKeyHash) {
+        // check if the entry can be replayed to a recently joined consumer
+        Position maxLastSentPosition = 
resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType);
+        if (maxLastSentPosition != null && 
entry.getPosition().compareTo(maxLastSentPosition) > 0) {
+            return false;
+        }
+
+        // If redeliveryMessages contains messages that correspond to the same 
hash as the entry to be dispatched
+        // do not send those messages for order guarantee
+        if (readType == ReadType.Normal && 
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
+            return false;
         }
+
+        return true;
+    }
+
+    /**
+     * Creates a filter for replaying messages. The filter is stateful and 
shouldn't be cached or reused.
+     * @see PersistentDispatcherMultipleConsumers#createFilterForReplay()
+     */
+    @Override
+    protected Predicate<Position> createFilterForReplay() {
+        return new ReplayPositionFilter();
+    }
+
+    /**
+     * Filter for replaying messages. The filter is stateful for a single 
invocation and shouldn't be cached, shared
+     * or reused. This is a short-lived object, and optimizing it for the "no 
garbage" coding style of Pulsar is
+     * unnecessary since the JVM can optimize allocations for short-lived 
objects.
+     */
+    private class ReplayPositionFilter implements Predicate<Position> {

Review Comment:
   Maybe we could export this to a separate file and have an explicit variable 
pointing back to the dispatcher.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to