This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a3b38c481 [improve][broker] Refactor a private method to eliminate 
an unnecessary parameter (#23915)
40a3b38c481 is described below

commit 40a3b38c48168190949d95432b5559ec19bf9e0e
Author: guan46 <[email protected]>
AuthorDate: Tue Feb 11 14:52:08 2025 +0800

    [improve][broker] Refactor a private method to eliminate an unnecessary 
parameter (#23915)
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 33 +++++++++-------------
 1 file changed, 14 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8bddbde02c9..9e92a2ab40d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -424,7 +424,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
             }
             int stickyKeyHash = getStickyKeyHash(entry);
             Consumer consumer = null;
-            MutableBoolean blockedByHash = null;
+            boolean blockedByHash = false;
             boolean dispatchEntry = false;
             // check if the hash is already blocked
             boolean hashIsAlreadyBlocked = 
alreadyBlockedHashes.contains(stickyKeyHash);
@@ -434,17 +434,21 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                     if (lookAheadAllowed) {
                         consumersForEntriesForLookaheadCheck.add(consumer);
                     }
-                    blockedByHash = lookAheadAllowed && readType == 
ReadType.Normal ? new MutableBoolean(false) : null;
+                    final var canUpdateBlockedByHash = lookAheadAllowed && 
readType == ReadType.Normal;
                     MutableInt permits =
                             permitsForConsumer.computeIfAbsent(consumer,
                                     k -> new 
MutableInt(getAvailablePermits(k)));
                     // a consumer was found for the sticky key hash and the 
entry can be dispatched
-                    if (permits.intValue() > 0
-                            && canDispatchEntry(consumer, entry, readType, 
stickyKeyHash, blockedByHash)) {
-                        // decrement the permits for the consumer
-                        permits.decrement();
-                        // allow the entry to be dispatched
-                        dispatchEntry = true;
+                    if (permits.intValue() > 0) {
+                        boolean canDispatchEntry = canDispatchEntry(consumer, 
entry, readType, stickyKeyHash);
+                        if (canDispatchEntry) {
+                            // decrement the permits for the consumer
+                            permits.decrement();
+                            // allow the entry to be dispatched
+                            dispatchEntry = true;
+                        } else if (canUpdateBlockedByHash) {
+                            blockedByHash = true;
+                        }
                     }
                 }
             }
@@ -458,7 +462,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
                     // the hash is blocked, add it to the set of blocked hashes
                     alreadyBlockedHashes.add(stickyKeyHash);
                 }
-                if (blockedByHash != null && blockedByHash.isTrue()) {
+                if (blockedByHash) {
                     // the entry is blocked by hash, add the consumer to the 
blocked set
                     blockedByHashConsumers.add(consumer);
                 }
@@ -507,27 +511,18 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
     // checks if the entry can be dispatched to the consumer
     private boolean canDispatchEntry(Consumer consumer, Entry entry,
-                                     ReadType readType, int stickyKeyHash,
-                                     MutableBoolean blockedByHash) {
+                                     ReadType readType, int stickyKeyHash) {
         // If redeliveryMessages contains messages that correspond to the same 
hash as the entry to be dispatched
         // do not send those messages for order guarantee
         if (readType == ReadType.Normal && 
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
-            if (blockedByHash != null) {
-                blockedByHash.setTrue();
-            }
             return false;
         }
-
         if (drainingHashesRequired) {
             // If the hash is draining, do not send the message
             if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, 
stickyKeyHash)) {
-                if (blockedByHash != null) {
-                    blockedByHash.setTrue();
-                }
                 return false;
             }
         }
-
         return true;
     }
 

Reply via email to