This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new c973603b02b [fix] [broker] In Key_Shared mode: remove unnecessary
mechanisms of message skip to avoid unnecessary consumption stuck (#20335)
c973603b02b is described below
commit c973603b02bb61d936e5759a65abc060e7c5e244
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 20 03:38:24 2023 +0800
[fix] [broker] In Key_Shared mode: remove unnecessary mechanisms of message
skip to avoid unnecessary consumption stuck (#20335)
- https://github.com/apache/pulsar/pull/7105 provide a mechanism to avoid a
stuck consumer affecting the consumption of other consumers:
- if all consumers can not accept more messages, stop delivering messages
to the client.
- if one consumer can not accept more messages, just read new messages
and deliver them to other consumers.
- https://github.com/apache/pulsar/pull/7553 provide a mechanism to fix the
issue of lost order of consumption: If the consumer cannot accept any more
messages, skip the consumer for the next round of message delivery because
there may be messages with the same key in the replay queue.
- https://github.com/apache/pulsar/pull/10762 provide a mechanism to fix
the issue of lost order of consumption: If there have any messages with the
same key in the replay queue, do not deliver the new messages to this consumer.
https://github.com/apache/pulsar/pull/10762 and
https://github.com/apache/pulsar/pull/7553 do the same thing and
https://github.com/apache/pulsar/pull/10762 is better than
https://github.com/apache/pulsar/pull/7553 , so
https://github.com/apache/pulsar/pull/7553 is unnecessary.
remove the mechanism provided by https://github.com/apache/pulsar/pull/7553
to avoid unnecessary consumption stuck.
(cherry picked from commit 1e664b7f550ffa28d3c810f3b7d6d625d5905eb3)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 24 +---------------------
1 file changed, 1 insertion(+), 23 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 90db639fde3..c5fdc950727 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -68,17 +68,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
*/
private final LinkedHashMap<Consumer, PositionImpl>
recentlyJoinedConsumers;
- private final Set<Consumer> stuckConsumers;
- private final Set<Consumer> nextStuckConsumers;
-
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf,
KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new
LinkedHashMap<>();
- this.stuckConsumers = new HashSet<>();
- this.nextStuckConsumers = new HashSet<>();
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
@@ -208,8 +203,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
}
- nextStuckConsumers.clear();
-
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
groupedEntries.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new
HashMap<>();
@@ -311,14 +304,11 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
- stuckConsumers.clear();
-
if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null ||
recentlyJoinedConsumers.isEmpty())) {
// This means, that all the messages we've just read cannot be
dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the
would not haven been triggered)
// 2. All keys in the current set of messages are routing to
consumers that are currently busy
- // and stuck is not caused by stuckConsumers
//
// The solution here is to move on and read next batch of messages
which might hopefully contain
// also keys meant for other consumers.
@@ -327,10 +317,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// ahead in the stream while the new consumers are not ready to
accept the new messages,
// therefore would be most likely only increase the distance
between read-position and mark-delete
// position.
- if (!nextStuckConsumers.isEmpty()) {
- isDispatcherStuckOnReplays = true;
- stuckConsumers.addAll(nextStuckConsumers);
- }
+ isDispatcherStuckOnReplays = true;
// readMoreEntries should run regardless whether or not stuck is
caused by
// stuckConsumers for avoid stopping dispatch.
topic.getBrokerService().executor().execute(() ->
readMoreEntries());
@@ -346,8 +333,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
private int getRestrictedMaxEntriesForConsumer(Consumer consumer,
List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
if (maxMessages == 0) {
- // the consumer was stuck
- nextStuckConsumers.add(consumer);
return 0;
}
if (readType == ReadType.Normal && stickyKeyHashes != null
@@ -364,13 +349,6 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// At this point, all the old messages were already consumed and this
consumer
// is now ready to receive any message
if (maxReadPosition == null) {
- // stop to dispatch by stuckConsumers
- if (stuckConsumers.contains(consumer)) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] stop to dispatch by stuckConsumers,
consumer: {}", name, consumer);
- }
- return 0;
- }
// The consumer has not recently joined, so we can send all
messages
return maxMessages;
}