equanz commented on code in PR #23231:
URL: https://github.com/apache/pulsar/pull/23231#discussion_r1738418649
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -553,17 +602,26 @@ private synchronized Position
updateIfNeededAndGetLastSentPosition() {
return lastSentPosition;
}
+ /**
+ * The dispatcher will skip replaying messages when all messages in the
replay queue are filtered out when
+ * isDispatcherStuckOnReplays=true. The flag gets resetted after the call.
Review Comment:
```suggestion
* skipNextReplayToTriggerLookAhead=true. The flag gets resetted after
the call.
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -345,26 +345,24 @@ public synchronized void readMoreEntries() {
return;
}
- NavigableSet<Position> messagesToReplayNow =
getMessagesToReplayNow(messagesToRead);
- NavigableSet<Position> messagesToReplayFiltered =
filterOutEntriesWillBeDiscarded(messagesToReplayNow);
- if (!messagesToReplayFiltered.isEmpty()) {
+ Set<Position> messagesToReplayNow =
+ canReplayMessages() ?
getMessagesToReplayNow(messagesToRead) : Collections.emptySet();
+ if (!messagesToReplayNow.isEmpty()) {
Review Comment:
memo: Moved from `getMessagesToReplayNow`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +373,159 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
// ahead in the stream while the new consumers are not ready to
accept the new messages,
// therefore would be most likely only increase the distance
between read-position and mark-delete
// position.
- isDispatcherStuckOnReplays = true;
+ skipNextReplayToTriggerLookAhead = true;
return true;
- } else if (currentThreadKeyNumber == 0) {
+ } else if (entriesByConsumerForDispatching.size() == 0) {
+ // There should be a backoff delay before retrying
return true;
}
return false;
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<?
extends Position> entries,
- int availablePermits, ReadType readType, Set<Integer>
stickyKeyHashes) {
- int maxMessages = Math.min(entries.size(), availablePermits);
- if (maxMessages == 0) {
- return 0;
+ private boolean isLookAheadAllowed() {
+ if
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
Review Comment:
In my understanding, the selector will select also the consumer that
recently joined. Messages that will be selected to the new consumer are not
dispatched by the recentlyJoinedConsumers feature. It may make large ack-hole(I
haven't verified this, so it could be different...).
But I don't think it will be a major problem because this PR introduces the
limit, and this feature is disabled by default.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -274,96 +250,54 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
}
}
- final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
- groupedEntries.clear();
- final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new
HashMap<>();
+ final Map<Consumer, List<Entry>> entriesByConsumerForDispatching =
+ filterAndGroupEntriesForDispatching(entries, readType);
- for (Entry entry : entries) {
- int stickyKeyHash = getStickyKeyHash(entry);
- Consumer c = selector.select(stickyKeyHash);
- if (c != null) {
- groupedEntries.computeIfAbsent(c, k -> new
ArrayList<>()).add(entry);
- consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new
HashSet<>()).add(stickyKeyHash);
- } else {
- addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
- entry.release();
- }
- }
-
- AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
-
- int currentThreadKeyNumber = groupedEntries.size();
- if (currentThreadKeyNumber == 0) {
- currentThreadKeyNumber = -1;
- }
- for (Map.Entry<Consumer, List<Entry>> current :
groupedEntries.entrySet()) {
+ AtomicInteger remainingConsumersToFinishSending = new
AtomicInteger(entriesByConsumerForDispatching.size());
Review Comment:
memo: Is this value the correct number of targets? => It seems to be
correct. There are no filters below.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -439,36 +373,159 @@ protected synchronized boolean
trySendMessagesToConsumers(ReadType readType, Lis
// ahead in the stream while the new consumers are not ready to
accept the new messages,
// therefore would be most likely only increase the distance
between read-position and mark-delete
// position.
- isDispatcherStuckOnReplays = true;
+ skipNextReplayToTriggerLookAhead = true;
return true;
- } else if (currentThreadKeyNumber == 0) {
+ } else if (entriesByConsumerForDispatching.size() == 0) {
+ // There should be a backoff delay before retrying
return true;
}
return false;
}
- private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<?
extends Position> entries,
- int availablePermits, ReadType readType, Set<Integer>
stickyKeyHashes) {
- int maxMessages = Math.min(entries.size(), availablePermits);
- if (maxMessages == 0) {
- return 0;
+ private boolean isLookAheadAllowed() {
+ if
(serviceConfig.isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist()
+ || (recentlyJoinedConsumers == null ||
recentlyJoinedConsumers.isEmpty())) {
+ long keySharedNumberOfReplayMessagesThresholdForLookAhead =
Math.max(
+
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()
+ * consumerList.size(),
+
serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription());
+ if (keySharedNumberOfReplayMessagesThresholdForLookAhead == 0
+ || redeliveryMessages.size() <
keySharedNumberOfReplayMessagesThresholdForLookAhead) {
+ return true;
+ }
}
- if (readType == ReadType.Normal && stickyKeyHashes != null
- &&
redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
- // If redeliveryMessages contains messages that correspond to the
same hash as the messages
- // that the dispatcher is trying to send, do not send those
messages for order guarantee
- return 0;
+ return false;
+ }
+
+ // groups the entries by consumer and filters out the entries that should
not be dispatched
+ // the entries are handled in the order they are received instead of first
grouping them by consumer and
+ // then filtering them
+ private Map<Consumer, List<Entry>>
filterAndGroupEntriesForDispatching(List<Entry> entries, ReadType readType) {
+ // entries grouped by consumer
+ Map<Consumer, List<Entry>> entriesGroupedByConsumer = new HashMap<>();
+ // consumers for which all remaining entries should be discarded
+ Set<Consumer> remainingEntriesFilteredForConsumer = new HashSet<>();
+
+ for (Entry entry : entries) {
+ int stickyKeyHash = getStickyKeyHash(entry);
+ Consumer consumer = selector.select(stickyKeyHash);
+ boolean dispatchEntry = false;
+ // a consumer was found for the sticky key hash and the consumer
can get more entries
+ if (consumer != null &&
!remainingEntriesFilteredForConsumer.contains(consumer)) {
+ dispatchEntry = canDispatchEntry(consumer, entry, readType,
stickyKeyHash);
+ }
+ if (dispatchEntry) {
+ // add the entry to consumer's entry list for dispatching
+ List<Entry> consumerEntries =
+ entriesGroupedByConsumer.computeIfAbsent(consumer, k
-> new ArrayList<>());
+ consumerEntries.add(entry);
+ } else {
+ // add the message to replay
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId(),
stickyKeyHash);
+ // release the entry as it will not be dispatched
+ entry.release();
+ // stop sending further entries to this consumer so that
ordering is preserved
+ remainingEntriesFilteredForConsumer.add(consumer);
+ }
+ }
+ return entriesGroupedByConsumer;
+ }
+
+ // checks if the entry can be dispatched to the consumer
+ private boolean canDispatchEntry(Consumer consumer, Entry entry, ReadType
readType, int stickyKeyHash) {
+ // check if the entry can be replayed to a recently joined consumer
+ Position maxLastSentPosition =
resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType);
+ if (maxLastSentPosition != null &&
entry.getPosition().compareTo(maxLastSentPosition) > 0) {
+ return false;
+ }
+
+ // If redeliveryMessages contains messages that correspond to the same
hash as the entry to be dispatched
+ // do not send those messages for order guarantee
+ if (readType == ReadType.Normal &&
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
+ return false;
}
+
+ return true;
+ }
+
+ /**
+ * Creates a filter for replaying messages. The filter is stateful and
shouldn't be cached or reused.
+ * @see PersistentDispatcherMultipleConsumers#createFilterForReplay()
+ */
+ @Override
+ protected Predicate<Position> createFilterForReplay() {
+ return new ReplayPositionFilter();
+ }
+
+ /**
+ * Filter for replaying messages. The filter is stateful for a single
invocation and shouldn't be cached, shared
+ * or reused. This is a short-lived object, and optimizing it for the "no
garbage" coding style of Pulsar is
+ * unnecessary since the JVM can optimize allocations for short-lived
objects.
+ */
+ private class ReplayPositionFilter implements Predicate<Position> {
+ // tracks the available permits for each consumer for the duration of
the filter usage
+ // the filter is stateful and shouldn't be shared or reused later
+ private final Map<Consumer, MutableInt> availablePermitsMap = new
HashMap<>();
+
+ @Override
+ public boolean test(Position position) {
+ // if out of order delivery is allowed, then any position will be
replayed
+ if (isAllowOutOfOrderDelivery()) {
+ return true;
+ }
+ // lookup the sticky key hash for the entry at the replay position
+ Long stickyKeyHash =
redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId());
+ if (stickyKeyHash == null) {
+ // the sticky key hash is missing for delayed messages, the
filtering will happen at the time of
+ // dispatch after reading the entry from the ledger
+ log.debug("[{}] replay of entry at position {} doesn't contain
sticky key hash.", name, position);
+ return true;
+ }
+ // find the consumer for the sticky key hash
+ Consumer consumer = selector.select(stickyKeyHash.intValue());
Review Comment:
memo: Can the dispatcher add new consumers when the message has already
evaluated and have not yet delivered? If so, is there any problem? => I think
the dispatcher can do that. However, even if it could, this is not a problem
because this evaluation is only affected by the reading and not by the actual
delivery.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -386,13 +384,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
- NavigableSet<Position> toReplay = getMessagesToReplayNow(1);
- if (!toReplay.isEmpty()) {
- minReplayedPosition = toReplay.first();
- redeliveryMessages.add(minReplayedPosition.getLedgerId(),
minReplayedPosition.getEntryId());
Review Comment:
memo: Why can this line be removed? => Because `updateMinReplayedPosition`
doesn't remove the delayed messages from the tracker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]