lhotari commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1734810618
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -436,36 +370,159 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
// ahead in the stream while the new consumers are not ready to
accept the new messages,
// therefore would be most likely only increase the distance
between read-position and mark-delete
// position.
- isDispatcherStuckOnReplays = true;
+ skipNextReplayToTriggerLookAhead = true;
return true;
- } else if (currentThreadKeyNumber == 0) {
+ } else if (entriesByConsumerForDispatching.size() == 0) {
+ // There should be a backoff delay before retrying
return true;
}
return false;
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<?
extends Position> entries,
- int availablePermits, ReadType readType, Set<Integer>
stickyKeyHashes) {
- int maxMessages = Math.min(entries.size(), availablePermits);
- if (maxMessages == 0) {
- return 0;
+ private boolean isLookAheadAllowed() {
+ if
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+ || (recentlyJoinedConsumers == null ||
recentlyJoinedConsumers.isEmpty())) {
+ long keySharedNumberOfReplayMessagesThresholdForLookAhead =
Math.max(
+
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+ * consumerList.size(),
+
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+ if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+ || redeliveryMessages.size() <
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+ return true;
+ }
}
- if (readType == ReadType.Normal && stickyKeyHashes != null
- &&
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
- // If redeliveryMessages contains messages that correspond to the
same hash as the messages
- // that the dispatcher is trying to send, do not send those
messages for order guarantee
- return 0;
+ return false;
+ }
+
+ // groups the entries by consumer and filters out the entries that should
not be dispatched
+ // the entries are handled in the order they are received instead of first
grouping them by consumer and
+ // then filtering them
+ private Map<Consumer, List<Entry>>
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+ // entries grouped by consumer
+ Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+ // consumers for which all remaining entries should be discarded
+ Set<Consumer> remainingEntriesFilteredForConsumer = new HashSet<>();
+
+ for (Entry entry : entries) {
+ int stickyKeyHash = getStickyKeyHash(entry);
+ Consumer consumer = selector.select(stickyKeyHash);
+ boolean dispatchEntry = false;
+ // a consumer was found for the sticky key hash and the consumer
can get more entries
+ if (consumer != null &&
!remainingEntriesFilteredForConsumer.contains(consumer)) {
+ dispatchEntry = canDispatchEntry(consumer, entry, readType,
stickyKeyHash);
+ }
+ if (dispatchEntry) {
+ // add the entry to consumer's entry list for dispatching
+ List<Entry> consumerEntries =
+ entriesGroupedByConsumer.computeIfAbsent(consumer, k
-> new ArrayList<>());
+ consumerEntries.add(entry);
+ } else {
+ // add the message to replay
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
+ // release the entry as it will not be dispatched
+ entry.release();
+ // stop sending further entries to this consumer so that
ordering is preserved
+ remainingEntriesFilteredForConsumer.add(consumer);
+ }
+ }
+ return entriesGroupedByConsumer;
+ }
+
+ // checks if the entry can be dispatched to the consumer
+ private boolean canDispatchEntry(Consumer consumer, Entry entry, ReadType
readType, int stickyKeyHash) {
+ // check if the entry can be replayed to a recently joined consumer
+ Position maxLastSentPosition =
resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType);
+ if (maxLastSentPosition != null &&
entry.getPosition().compareTo(maxLastSentPosition) > 0) {
+ return false;
+ }
+
+ // If redeliveryMessages contains messages that correspond to the same
hash as the entry to be dispatched
+ // do not send those messages for order guarantee
+ if (readType == ReadType.Normal &&
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
+ return false;
}
Review Comment:
This is a solution to another problem that exists. All progress got blocked
when any of the messages that were read were in the replay queue:
https://github.com/apache/pulsar/blob/d7e8ea16e6682df9a9354cda25cf4f1f9cb54429/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L453-L458
This defeats the purpose of the look ahead feature added in #7105 and makes
the problem worse.
This problem is also fixed in this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]