lhotari commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1746590301
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +372,162 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
// ahead in the stream while the new consumers are not ready to
accept the new messages,
// therefore would be most likely only increase the distance
between read-position and mark-delete
// position.
- isDispatcherStuckOnReplays = true;
+ skipNextReplayToTriggerLookAhead = true;
return true;
- } else if (currentThreadKeyNumber == 0) {
+ } else if (entriesByConsumerForDispatching.size() == 0) {
+ // There should be a backoff delay before retrying
return true;
}
return false;
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<?
extends Position> entries,
- int availablePermits, ReadType readType, Set<Integer>
stickyKeyHashes) {
- int maxMessages = Math.min(entries.size(), availablePermits);
- if (maxMessages == 0) {
- return 0;
+ private boolean isLookAheadAllowed() {
+ if
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+ || (recentlyJoinedConsumers == null ||
recentlyJoinedConsumers.isEmpty())) {
+ long keySharedNumberOfReplayMessagesThresholdForLookAhead =
Math.max(
+
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+ * consumerList.size(),
+
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+ if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+ || redeliveryMessages.size() <
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+ return true;
+ }
}
- if (readType == ReadType.Normal && stickyKeyHashes != null
- &&
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
- // If redeliveryMessages contains messages that correspond to the
same hash as the messages
- // that the dispatcher is trying to send, do not send those
messages for order guarantee
- return 0;
+ return false;
+ }
+
+ // groups the entries by consumer and filters out the entries that should
not be dispatched
+ // the entries are handled in the order they are received instead of first
grouping them by consumer and
+ // then filtering them
+ private Map<Consumer, List<Entry>>
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+ // entries grouped by consumer
+ Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+ // permits for consumer, permits are for entries/batches
+ Map<Consumer, MutableInt> permitsForConsumer = new HashMap<>();
+
+ for (Entry entry : entries) {
+ int stickyKeyHash = getStickyKeyHash(entry);
+ Consumer consumer = selector.select(stickyKeyHash);
+ boolean dispatchEntry = false;
+ // a consumer was found for the sticky key hash and the entry can
be dispatched
+ if (consumer != null && canDispatchEntry(consumer, entry,
readType, stickyKeyHash)) {
Review Comment:
thanks for the context @poorbarcode . I see now what I need to change to
optimize it.
I'll cache the result of the
resolveMaxLastSentPositionForRecentlyJoinedConsumer call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]