This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 2e289e64808 [improve][broker] PIP-380:
Support-setting-up-specific-namespaces-to-skipping-the-load-shedding (#23549)
2e289e64808 is described below
commit 2e289e6480801a0e0e694d1b1661e3984d675b30
Author: Kai Wang <[email protected]>
AuthorDate: Tue Jun 10 14:35:04 2025 +0800
[improve][broker] PIP-380:
Support-setting-up-specific-namespaces-to-skipping-the-load-shedding (#23549)
(cherry picked from commit 0f9ea181b084907ec8cb3d25535f7c6e3d2ffdc2)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 7 ++++
.../extensions/ExtensibleLoadManagerImpl.java | 28 +++++++++++++++-
.../loadbalance/extensions/models/TopKBundles.java | 11 ++++--
.../extensions/scheduler/TransferShedder.java | 9 +++++
.../RoundRobinBrokerSelectionStrategy.java | 37 ++++++++++++++++++++
.../loadbalance/impl/ModularLoadManagerImpl.java | 39 ++++++++++++++++++++--
.../extensions/ExtensibleLoadManagerImplTest.java | 35 +++++++++++++++++++
.../extensions/models/TopKBundlesTest.java | 22 ++++++++++++
.../extensions/scheduler/TransferShedderTest.java | 19 +++++++++++
9 files changed, 201 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dede4543fc3..820c6b515e5 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3101,6 +3101,13 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean loadBalancerSheddingBundlesWithPoliciesEnabled = false;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "The namespaces to be excluded from load shedding"
+ )
+ private Set<String> loadBalancerSheddingExcludedNamespaces = new
HashSet<>();
+
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Time to wait before fixing any stuck in-flight service unit
states. "
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index ef29d7d9a74..af95df60174 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -84,6 +84,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactor
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategyFactory;
import
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.RoundRobinBrokerSelectionStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.namespace.LookupOptions;
@@ -161,6 +162,8 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@Getter
private final BrokerSelectionStrategy brokerSelectionStrategy;
+ private final BrokerSelectionStrategy
sheddingExcludedNamespaceSelectionStrategy;
+
@Getter
private final List<BrokerFilter> brokerFilterPipeline;
@@ -254,6 +257,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
this.brokerSelectionStrategy = createBrokerSelectionStrategy();
+ this.sheddingExcludedNamespaceSelectionStrategy = new
RoundRobinBrokerSelectionStrategy();
}
public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
@@ -636,11 +640,33 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
return Optional.empty();
}
Set<String> candidateBrokers =
availableBrokerCandidates.keySet();
- return
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
+ return
getBrokerSelectionStrategy(bundle).select(candidateBrokers, bundle, context);
});
});
}
+ /**
+ * For shedding excluded namespaces, use RoundRobinBrokerSelector to
assign the ownership,
+ * it can make the assignment more average because these will not
automatically rebalance to
+ * another broker unless manually unloaded it.
+ *
+ * @param bundle the bundle to assign
+ * @return the broker selection strategy
+ */
+ private BrokerSelectionStrategy getBrokerSelectionStrategy(ServiceUnitId
bundle) {
+
+ Set<String> sheddingExcludedNamespaces =
conf.getLoadBalancerSheddingExcludedNamespaces();
+
+ var namespace = NamespaceBundle.getBundleNamespace(bundle.toString());
+ if (sheddingExcludedNamespaces.contains(namespace)) {
+ if (debug(conf, log)) {
+ log.info("Use round robin broker selector for {}", bundle);
+ }
+ return sheddingExcludedNamespaceSelectionStrategy;
+ }
+ return brokerSelectionStrategy;
+ }
+
@Override
public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
return getOwnershipAsync(topic, bundleUnit)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
index 9c6e9634178..481e907d044 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
@@ -68,8 +69,10 @@ public class TopKBundles {
public void update(Map<String, NamespaceBundleStats> bundleStats, int
topk) {
arr.clear();
try {
+ var conf = pulsar.getConfiguration();
var isLoadBalancerSheddingBundlesWithPoliciesEnabled =
-
pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled();
+ conf.isLoadBalancerSheddingBundlesWithPoliciesEnabled();
+ Set<String> sheddingExcludedNamespaces =
conf.getLoadBalancerSheddingExcludedNamespaces();
for (var etr : bundleStats.entrySet()) {
String bundle = etr.getKey();
var stat = etr.getValue();
@@ -79,12 +82,16 @@ public class TopKBundles {
continue;
}
// TODO: do not filter system topic while shedding
- if
(NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle)))
{
+ String namespace = NamespaceBundle.getBundleNamespace(bundle);
+ if (NamespaceService.isSystemServiceNamespace(namespace)) {
continue;
}
if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled &&
hasPolicies(bundle)) {
continue;
}
+ if (sheddingExcludedNamespaces.contains(namespace)) {
+ continue;
+ }
arr.add(etr);
}
var topKBundlesLoadData = loadData.getTopBundlesLoadData();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index b5255f2713a..18555bc18fa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -493,6 +493,7 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
}
int remainingTopBundles = maxBrokerTopBundlesLoadData.size();
+ Set<String> sheddingExcludedNamespaces =
conf.getLoadBalancerSheddingExcludedNamespaces();
for (var e : maxBrokerTopBundlesLoadData) {
String bundle = e.bundleName();
if (channel != null && !channel.isOwner(bundle,
maxBroker)) {
@@ -502,6 +503,14 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
}
continue;
}
+ final String namespaceName =
NamespaceBundle.getBundleNamespace(bundle);
+ if (sheddingExcludedNamespaces.contains(namespaceName)) {
+ if (debugMode) {
+ log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
+ + " Bundle namespace has been found in
sheddingExcludedNamespaces", bundle));
+ }
+ continue;
+ }
if (recentlyUnloadedBundles.containsKey(bundle)) {
if (debugMode) {
log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java
new file mode 100644
index 00000000000..2f356ec1f5e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/RoundRobinBrokerSelectionStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
+
+import java.util.Optional;
+import java.util.Set;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+/**
+ * Simple Round Robin Broker Selection Strategy.
+ */
+public class RoundRobinBrokerSelectionStrategy implements
BrokerSelectionStrategy {
+ private final RoundRobinBrokerSelector selector = new
RoundRobinBrokerSelector();
+
+ @Override
+ public Optional<String> select(Set<String> brokers, ServiceUnitId bundle,
LoadManagerContext context) {
+ return selector.selectBroker(brokers, null, null,
context.brokerConfiguration());
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 1f549cbb66e..a9d7ddd78e0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -152,6 +152,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// Strategy used to determine where new topics should be placed.
private ModularLoadManagerStrategy placementStrategy;
+ private ModularLoadManagerStrategy
sheddingExcludedNamespaceSelectionStrategy;
+
// Policies used to determine which brokers are available for particular
namespaces.
private SimpleResourceAllocationPolicies policies;
@@ -252,6 +254,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
defaultStats.msgRateOut = DEFAULT_MESSAGE_RATE;
placementStrategy = ModularLoadManagerStrategy.create(conf);
+ sheddingExcludedNamespaceSelectionStrategy = new
RoundRobinBrokerSelector();
policies = new SimpleResourceAllocationPolicies(pulsar);
filterPipeline.add(new BrokerLoadManagerClassFilter());
filterPipeline.add(new BrokerVersionFilter());
@@ -641,6 +644,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
final Map<String, Long> recentlyUnloadedBundles =
loadData.getRecentlyUnloadedBundles();
recentlyUnloadedBundles.keySet().removeIf(e ->
recentlyUnloadedBundles.get(e) < timeout);
+ Set<String> sheddingExcludedNamespaces =
conf.getLoadBalancerSheddingExcludedNamespaces();
final Multimap<String, String> bundlesToUnload =
loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
bundlesToUnload.asMap().forEach((broker, bundles) -> {
@@ -648,6 +652,13 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
bundles.forEach(bundle -> {
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ if (sheddingExcludedNamespaces.contains(namespaceName)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping load shedding for namespace
{}",
+
loadSheddingStrategy.getClass().getSimpleName(), namespaceName);
+ }
+ return;
+ }
if (!shouldNamespacePoliciesUnload(namespaceName, bundleRange,
broker)) {
return;
}
@@ -931,8 +942,22 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
brokerTopicLoadingPredicate);
}
- // Choose a broker among the potentially smaller filtered list,
when possible
- Optional<String> broker =
placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
+ Optional<String> broker;
+ // For shedding excluded namespaces, use RoundRobinBrokerSelector
to assign the ownership,
+ // it can make the assignment more average because these will not
automatically rebalance to
+ // another broker unless manually unloaded it.
+ Set<String> sheddingExcludedNamespaces =
conf.getLoadBalancerSheddingExcludedNamespaces();
+ String namespaceNameFromBundleName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ if
(sheddingExcludedNamespaces.contains(namespaceNameFromBundleName)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Use round robin broker selector for {}",
bundle);
+ }
+ broker = sheddingExcludedNamespaceSelectionStrategy
+ .selectBroker(brokerCandidateCache, data, loadData,
conf);
+ } else {
+ // Choose a broker among the potentially smaller filtered
list, when possible
+ broker = placementStrategy.selectBroker(brokerCandidateCache,
data, loadData, conf);
+ }
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}",
broker, brokerCandidateCache);
}
@@ -1139,7 +1164,15 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
*/
private int selectTopKBundle() {
bundleArr.clear();
- bundleArr.addAll(loadData.getBundleData().entrySet());
+ Set<String> sheddingExcludedNamespaces =
conf.getLoadBalancerSheddingExcludedNamespaces();
+ for (Map.Entry<String, BundleData> entry :
loadData.getBundleData().entrySet()) {
+ String bundle = entry.getKey();
+ String namespace = NamespaceBundle.getBundleNamespace(bundle);
+ if (sheddingExcludedNamespaces.contains(namespace)) {
+ continue;
+ }
+ bundleArr.add(entry);
+ }
int maxNumberOfBundlesInBundleLoadReport = pulsar.getConfiguration()
.getLoadBalancerMaxNumberOfBundlesInBundleLoadReport();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 97921488002..65d017499fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -206,6 +206,41 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
assertEquals(webServiceUrl.get().toString(),
brokerLookupData.get().getWebServiceUrl());
}
+ // Test that the load manager will use round-robin assignment
+ // if the namespace is in loadBalancerSheddingExcludedNamespaces.
+ @Test
+ public void testSelectBrokerForSheddingExcludedNamespaces() throws
Exception {
+
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of(defaultTestNamespace));
+ try {
+ Pair<TopicName, NamespaceBundle> topicAndBundle =
+ getBundleIsNotOwnByChangeEventTopic("test-topic" +
UUID.randomUUID());
+ NamespaceBundle bundle1 = topicAndBundle.getRight();
+ Optional<BrokerLookupData> brokerLookupData1 =
primaryLoadManager.assign(Optional.empty(), bundle1,
+ LookupOptions.builder().build()).get();
+ assertTrue(brokerLookupData1.isPresent());
+ log.info("Assign the bundle1 {} to {}", bundle1,
brokerLookupData1);
+
+ String webServiceUrl1 = brokerLookupData1.get().getWebServiceUrl();
+
+ Pair<TopicName, NamespaceBundle> topicAndBundle2 =
+ getBundleIsNotOwnByChangeEventTopic("test-topic-" +
UUID.randomUUID());
+
+ while
(topicAndBundle2.getRight().toString().equals(topicAndBundle.getRight().toString())
+ ||
primaryLoadManager.checkOwnershipAsync(Optional.empty(),
topicAndBundle2.getRight()).get()) {
+ topicAndBundle2 =
getBundleIsNotOwnByChangeEventTopic("test-topic-" + UUID.randomUUID());
+ }
+ NamespaceBundle bundle2 = topicAndBundle2.getRight();
+ Optional<BrokerLookupData> brokerLookupData2 =
primaryLoadManager.assign(Optional.empty(), bundle2,
+ LookupOptions.builder().build()).get();
+ assertTrue(brokerLookupData2.isPresent());
+ log.info("Assign the bundle2 {} to {}", bundle2,
brokerLookupData2);
+ String webServiceUrl2 = brokerLookupData2.get().getWebServiceUrl();
+ assertNotEquals(webServiceUrl1, webServiceUrl2);
+ } finally {
+
pulsar1.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of());
+ }
+ }
+
@Test
public void testLookupOptions() throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
index 0f6ae9b2629..2be3108c638 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
+import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
@@ -136,6 +137,27 @@ public class TopKBundlesTest {
assertEquals(top0.bundleName(), bundle1);
}
+ @Test
+ public void testSheddingExcludedNamespaces() {
+ Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
+ var topKBundles = new TopKBundles(pulsar);
+
pulsar.getConfiguration().setLoadBalancerSheddingExcludedNamespaces(Set.of("my-tenant/my-namespace2"));
+ NamespaceBundleStats stats1 = new NamespaceBundleStats();
+ stats1.msgRateIn = 500;
+ bundleStats.put("my-tenant/my-namespace2/0x00000000_0x0FFFFFFF",
stats1);
+
+ NamespaceBundleStats stats2 = new NamespaceBundleStats();
+ stats2.msgRateIn = 10000;
+ stats2.msgThroughputOut = 10;
+ bundleStats.put(bundle1, stats2);
+
+ topKBundles.update(bundleStats, 2);
+
+ assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(),
1);
+ var top0 = topKBundles.getLoadData().getTopBundlesLoadData().get(0);
+ assertEquals(top0.bundleName(), bundle1);
+ }
+
@Test
public void testZeroMsgThroughputBundleStats() {
Map<String, NamespaceBundleStats> bundleStats = new HashMap<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 716e1f2a2a2..a144d84fd7a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -621,6 +621,25 @@ public class TransferShedderTest {
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}
+ @Test
+ public void testSheddingExcludedNamespaces() {
+ UnloadCounter counter = new UnloadCounter();
+ TransferShedder transferShedder = new TransferShedder(counter);
+ var ctx = setupContext();
+ ctx.brokerConfiguration().setLoadBalancerSheddingExcludedNamespaces(
+ Set.of("my-tenant/my-namespaceE", "my-tenant/my-namespaceD"));
+
+ var res = transferShedder.findBundlesForUnloading(ctx, new
HashMap<>(), Map.of());
+ var expected = new HashSet<UnloadDecision>();
+ expected.add(new UnloadDecision(new Unload("broker3:8080",
+ "my-tenant/my-namespaceC/0x00000000_0x0FFFFFFF",
+ Optional.of("broker1:8080")),
+ Success, Overloaded));
+ assertEquals(res, expected);
+ assertEquals(counter.getLoadAvg(), setupLoadAvg);
+ assertEquals(counter.getLoadStd(), setupLoadStd);
+ }
+
@Test
public void testGetAvailableBrokersFailed() {
UnloadCounter counter = new UnloadCounter();