This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 963260abfa1 [fix] [broker] Make `LeastResourceUsageWithWeight` thread
safe (#20159)
963260abfa1 is described below
commit 963260abfa142b8cf9ffe372d85d470a78c17235
Author: lifepuzzlefun <[email protected]>
AuthorDate: Sun Apr 23 11:27:48 2023 +0800
[fix] [broker] Make `LeastResourceUsageWithWeight` thread safe (#20159)
Fix #20160
### Motivation
LeastResourceUsageWithWeight is an stateful object and current code will be
accessed by multithread.
thread 1: is execute
https://github.com/apache/pulsar/blob/2b41e4eafb1cba0e548dd90df60e8cdbb24cd490/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java#L89-L91
and thread 2: is execute
https://github.com/apache/pulsar/blob/2b41e4eafb1cba0e548dd90df60e8cdbb24cd490/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java#L147-L150
so an IndexOutOfBound occurs.
### Modifications
change the state to `ThreadLocal`
---
.../strategy/LeastResourceUsageWithWeight.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index 902cfdaf73f..98986d84b98 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.concurrent.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
@@ -35,14 +36,15 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
* cause cluster fluctuations due to short-term load jitter.
*/
@Slf4j
+@ThreadSafe
public class LeastResourceUsageWithWeight implements BrokerSelectionStrategy {
// Maintain this list to reduce object creation.
- private final ArrayList<String> bestBrokers;
- private final Set<String> noLoadDataBrokers;
+ private final ThreadLocal<ArrayList<String>> bestBrokers;
+ private final ThreadLocal<HashSet<String>> noLoadDataBrokers;
public LeastResourceUsageWithWeight() {
- this.bestBrokers = new ArrayList<>();
- this.noLoadDataBrokers = new HashSet<>();
+ this.bestBrokers = ThreadLocal.withInitial(ArrayList::new);
+ this.noLoadDataBrokers = ThreadLocal.withInitial(HashSet::new);
}
// A broker's max resource usage with weight using its historical load and
short-term load data with weight.
@@ -70,7 +72,6 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
/**
* Find a suitable broker to assign the given bundle to.
- * This method is not thread safety.
*
* @param candidates The candidates for which the bundle may be
assigned.
* @param bundleToAssign The data for the bundle to assign.
@@ -86,6 +87,9 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
return Optional.empty();
}
+ ArrayList<String> bestBrokers = this.bestBrokers.get();
+ HashSet<String> noLoadDataBrokers = this.noLoadDataBrokers.get();
+
bestBrokers.clear();
noLoadDataBrokers.clear();
// Maintain of list of all the best scoring brokers and then randomly
@@ -135,9 +139,7 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
log.info("Assign randomly as none of the brokers are
underloaded. candidatesSize:{}, "
+ "noLoadDataBrokersSize:{}", candidates.size(),
noLoadDataBrokers.size());
}
- for (String broker : candidates) {
- bestBrokers.add(broker);
- }
+ bestBrokers.addAll(candidates);
}
if (debugMode) {