This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c93d6873165 [fix] [broker] In Key_Shared mode: remove unnecessary 
mechanisms of message skip to avoid unnecessary consumption stuck (#20335)
c93d6873165 is described below

commit c93d687316555be37628db62fdf6cdd252af754c
Author: fengyubiao <[email protected]>
AuthorDate: Sat May 20 03:38:24 2023 +0800

    [fix] [broker] In Key_Shared mode: remove unnecessary mechanisms of message 
skip to avoid unnecessary consumption stuck (#20335)
    
    ### Motivation
    - https://github.com/apache/pulsar/pull/7105 provide a mechanism to avoid a 
stuck consumer affecting the consumption of other consumers:
      - if all consumers can not accept more messages, stop delivering messages 
to the client.
      - if one consumer can not accept more messages, just read new messages 
and deliver them to other consumers.
    - https://github.com/apache/pulsar/pull/7553 provide a mechanism to fix the 
issue of lost order of consumption: If the consumer cannot accept any more 
messages, skip the consumer for the next round of message delivery because 
there may be messages with the same key in the replay queue.
    - https://github.com/apache/pulsar/pull/10762 provide a mechanism to fix 
the issue of lost order of consumption: If there have any messages with the 
same key in the replay queue, do not deliver the new messages to this consumer.
    
    https://github.com/apache/pulsar/pull/10762 and 
https://github.com/apache/pulsar/pull/7553 do the same thing and 
https://github.com/apache/pulsar/pull/10762 is better than 
https://github.com/apache/pulsar/pull/7553 , so 
https://github.com/apache/pulsar/pull/7553 is unnecessary.
    
    ### Modifications
    remove the mechanism provided by https://github.com/apache/pulsar/pull/7553 
to avoid unnecessary consumption stuck.
    
    (cherry picked from commit 1e664b7f550ffa28d3c810f3b7d6d625d5905eb3)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 24 +---------------------
 1 file changed, 1 insertion(+), 23 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 1a8c6e180a2..8f05530f58b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -71,17 +71,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
      */
     private final LinkedHashMap<Consumer, PositionImpl> 
recentlyJoinedConsumers;
 
-    private final Set<Consumer> stuckConsumers;
-    private final Set<Consumer> nextStuckConsumers;
-
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
             Subscription subscription, ServiceConfiguration conf, 
KeySharedMeta ksm) {
         super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
 
         this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
         this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new 
LinkedHashMap<>();
-        this.stuckConsumers = new HashSet<>();
-        this.nextStuckConsumers = new HashSet<>();
         this.keySharedMode = ksm.getKeySharedMode();
         switch (this.keySharedMode) {
         case AUTO_SPLIT:
@@ -226,8 +221,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             }
         }
 
-        nextStuckConsumers.clear();
-
         final Map<Consumer, List<Entry>> groupedEntries = 
localGroupedEntries.get();
         groupedEntries.clear();
         final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new 
HashMap<>();
@@ -318,14 +311,11 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // acquire message-dispatch permits for already delivered messages
         acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, 
totalMessagesSent, totalBytesSent);
 
-        stuckConsumers.clear();
-
         if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || 
recentlyJoinedConsumers.isEmpty())) {
             // This means, that all the messages we've just read cannot be 
dispatched right now.
             // This condition can only happen when:
             //  1. We have consumers ready to accept messages (otherwise the 
would not haven been triggered)
             //  2. All keys in the current set of messages are routing to 
consumers that are currently busy
-            //     and stuck is not caused by stuckConsumers
             //
             // The solution here is to move on and read next batch of messages 
which might hopefully contain
             // also keys meant for other consumers.
@@ -334,10 +324,7 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             // ahead in the stream while the new consumers are not ready to 
accept the new messages,
             // therefore would be most likely only increase the distance 
between read-position and mark-delete
             // position.
-            if (!nextStuckConsumers.isEmpty()) {
-                isDispatcherStuckOnReplays = true;
-                stuckConsumers.addAll(nextStuckConsumers);
-            }
+            isDispatcherStuckOnReplays = true;
             return true;
         }  else if (currentThreadKeyNumber == 0) {
             return true;
@@ -348,8 +335,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     private int getRestrictedMaxEntriesForConsumer(Consumer consumer, 
List<Entry> entries, int maxMessages,
             ReadType readType, Set<Integer> stickyKeyHashes) {
         if (maxMessages == 0) {
-            // the consumer was stuck
-            nextStuckConsumers.add(consumer);
             return 0;
         }
         if (readType == ReadType.Normal && stickyKeyHashes != null
@@ -366,13 +351,6 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // At this point, all the old messages were already consumed and this 
consumer
         // is now ready to receive any message
         if (maxReadPosition == null) {
-            // stop to dispatch by stuckConsumers
-            if (stuckConsumers.contains(consumer)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] stop to dispatch by stuckConsumers, 
consumer: {}", name, consumer);
-                }
-                return 0;
-            }
             // The consumer has not recently joined, so we can send all 
messages
             return maxMessages;
         }

Reply via email to