This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 40a3b38c481 [improve][broker] Refactor a private method to eliminate
an unnecessary parameter (#23915)
40a3b38c481 is described below
commit 40a3b38c48168190949d95432b5559ec19bf9e0e
Author: guan46 <[email protected]>
AuthorDate: Tue Feb 11 14:52:08 2025 +0800
[improve][broker] Refactor a private method to eliminate an unnecessary
parameter (#23915)
---
...istentStickyKeyDispatcherMultipleConsumers.java | 33 +++++++++-------------
1 file changed, 14 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8bddbde02c9..9e92a2ab40d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -424,7 +424,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = null;
- MutableBoolean blockedByHash = null;
+ boolean blockedByHash = false;
boolean dispatchEntry = false;
// check if the hash is already blocked
boolean hashIsAlreadyBlocked =
alreadyBlockedHashes.contains(stickyKeyHash);
@@ -434,17 +434,21 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
- blockedByHash = lookAheadAllowed && readType ==
ReadType.Normal ? new MutableBoolean(false) : null;
+ final var canUpdateBlockedByHash = lookAheadAllowed &&
readType == ReadType.Normal;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new
MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the
entry can be dispatched
- if (permits.intValue() > 0
- && canDispatchEntry(consumer, entry, readType,
stickyKeyHash, blockedByHash)) {
- // decrement the permits for the consumer
- permits.decrement();
- // allow the entry to be dispatched
- dispatchEntry = true;
+ if (permits.intValue() > 0) {
+ boolean canDispatchEntry = canDispatchEntry(consumer,
entry, readType, stickyKeyHash);
+ if (canDispatchEntry) {
+ // decrement the permits for the consumer
+ permits.decrement();
+ // allow the entry to be dispatched
+ dispatchEntry = true;
+ } else if (canUpdateBlockedByHash) {
+ blockedByHash = true;
+ }
}
}
}
@@ -458,7 +462,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
// the hash is blocked, add it to the set of blocked hashes
alreadyBlockedHashes.add(stickyKeyHash);
}
- if (blockedByHash != null && blockedByHash.isTrue()) {
+ if (blockedByHash) {
// the entry is blocked by hash, add the consumer to the
blocked set
blockedByHashConsumers.add(consumer);
}
@@ -507,27 +511,18 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// checks if the entry can be dispatched to the consumer
private boolean canDispatchEntry(Consumer consumer, Entry entry,
- ReadType readType, int stickyKeyHash,
- MutableBoolean blockedByHash) {
+ ReadType readType, int stickyKeyHash) {
// If redeliveryMessages contains messages that correspond to the same
hash as the entry to be dispatched
// do not send those messages for order guarantee
if (readType == ReadType.Normal &&
redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
- if (blockedByHash != null) {
- blockedByHash.setTrue();
- }
return false;
}
-
if (drainingHashesRequired) {
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer,
stickyKeyHash)) {
- if (blockedByHash != null) {
- blockedByHash.setTrue();
- }
return false;
}
}
-
return true;
}