Demogorgon314 commented on code in PR #19560:
URL: https://github.com/apache/pulsar/pull/19560#discussion_r1112785155
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -57,10 +60,12 @@ public class ThresholdShedder implements
LoadSheddingStrategy {
private static final double MB = 1024 * 1024;
private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
+ private final Set<String> activeBrokers = Collections.newSetFromMap(new
ConcurrentHashMap<>());
Review Comment:
Why use a thread-safe set here? I see you already used `synchronized`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -233,5 +237,23 @@ private Pair<Boolean, String> getMaxUsageBroker(
}
return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
}
+ @Override
+ public void onBrokerChange(Set<String> newBrokers) {
+ synchronized (activeBrokers) {
+ activeBrokers.clear();
+ activeBrokers.addAll(newBrokers);
+ }
+ }
+
+ private void cleanUnActiveBroker() {
+ if (!activeBrokers.isEmpty()) {
+ synchronized (activeBrokers) {
+ if (!activeBrokers.isEmpty()) {
+ brokerAvgResourceUsage.keySet().removeIf((key) ->
!activeBrokers.contains(key));
Review Comment:
Also, we can use `keySet().retainAll(activeBrokers)` to get Intersection.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java:
##########
@@ -233,5 +237,23 @@ private Pair<Boolean, String> getMaxUsageBroker(
}
return Pair.of(hasBrokerBelowLowerBound, maxUsageBrokerName);
}
+ @Override
+ public void onBrokerChange(Set<String> newBrokers) {
+ synchronized (activeBrokers) {
+ activeBrokers.clear();
+ activeBrokers.addAll(newBrokers);
+ }
+ }
+
+ private void cleanUnActiveBroker() {
+ if (!activeBrokers.isEmpty()) {
+ synchronized (activeBrokers) {
+ if (!activeBrokers.isEmpty()) {
+ brokerAvgResourceUsage.keySet().removeIf((key) ->
!activeBrokers.contains(key));
Review Comment:
Can we clean this map in the `onBrokerChange` method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]