This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b6a73823ce2 [improve][broker] PIP-192: Support broker isolation policy
(#19592)
b6a73823ce2 is described below
commit b6a73823ce2ed47baa606b2ecc5f1569e4c101a5
Author: Kai Wang <[email protected]>
AuthorDate: Sat Mar 11 22:59:29 2023 +0800
[improve][broker] PIP-192: Support broker isolation policy (#19592)
---
.../extensions/ExtensibleLoadManagerImpl.java | 10 +-
.../extensions/filter/BrokerFilter.java | 12 +-
...ter.java => BrokerIsolationPoliciesFilter.java} | 39 ++--
.../filter/BrokerMaxTopicCountFilter.java | 8 +
.../extensions/filter/BrokerVersionFilter.java | 11 +-
.../policies/IsolationPoliciesHelper.java | 68 ++++++
.../extensions/policies/package-info.java | 19 ++
.../extensions/scheduler/TransferShedder.java | 79 ++++++-
.../extensions/ExtensibleLoadManagerImplTest.java | 61 +++++-
.../filter/BrokerIsolationPoliciesFilterTest.java | 222 ++++++++++++++++++++
.../filter/BrokerMaxTopicCountFilterTest.java | 2 +-
.../extensions/filter/BrokerVersionFilterTest.java | 13 +-
.../extensions/scheduler/TransferShedderTest.java | 227 +++++++++++++++++++--
13 files changed, 715 insertions(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 2bebe203d87..82790f44fcb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -42,6 +42,7 @@ import
org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
import
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
import
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
@@ -139,6 +140,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
public ExtensibleLoadManagerImpl() {
this.brokerFilterPipeline = new ArrayList<>();
this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
+ this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
this.brokerFilterPipeline.add(new BrokerVersionFilter());
// TODO: Make brokerSelectionStrategy configurable.
this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
@@ -225,6 +227,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
+ this.brokerFilterPipeline.forEach(brokerFilter ->
brokerFilter.initialize(pulsar));
}
@Override
@@ -287,7 +290,6 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
BrokerRegistry brokerRegistry = getBrokerRegistry();
return brokerRegistry.getAvailableBrokerLookupDataAsync()
.thenCompose(availableBrokers -> {
- // TODO: Support isolation policies
LoadManagerContext context = this.getContext();
Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
@@ -296,11 +298,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
for (final BrokerFilter filter : filterPipeline) {
try {
- filter.filter(availableBrokerCandidates, context);
+ filter.filter(availableBrokerCandidates, bundle,
context);
+ // Preserve the filter successes result.
+
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
} catch (BrokerFilterException e) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers.", e);
- availableBrokerCandidates = availableBrokers;
+ availableBrokerCandidates = new
HashMap<>(availableBrokers);
}
}
if (availableBrokerCandidates.isEmpty()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 35f4b6817f1..30d25f559b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -19,9 +19,11 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;
import java.util.Map;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
/**
* Filter out unqualified Brokers, which are not entered into LoadBalancer for
decision-making.
@@ -33,14 +35,22 @@ public interface BrokerFilter {
*/
String name();
+ /**
+ * Initialize this broker filter using the given pulsar service.
+ */
+ void initialize(PulsarService pulsar);
+
/**
* Filter out unqualified brokers based on implementation.
*
* @param brokers The full broker and lookup data.
+ * @param serviceUnit The current serviceUnit.
* @param context The load manager context.
* @return Filtered broker list.
*/
- Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers, LoadManagerContext context)
+ Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context)
throws BrokerFilterException;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
similarity index 54%
copy from
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
copy to
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
index e3f8faca324..b28c77f76f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
@@ -19,15 +19,23 @@
package org.apache.pulsar.broker.loadbalance.extensions.filter;
import java.util.Map;
-import java.util.Optional;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
-import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
+import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
-public class BrokerMaxTopicCountFilter implements BrokerFilter {
- public static final String FILTER_NAME = "broker_max_topic_count_filter";
+@Slf4j
+public class BrokerIsolationPoliciesFilter implements BrokerFilter {
+
+ public static final String FILTER_NAME =
"broker_isolation_policies_filter";
+
+ private IsolationPoliciesHelper isolationPoliciesHelper;
@Override
public String name() {
@@ -35,15 +43,18 @@ public class BrokerMaxTopicCountFilter implements
BrokerFilter {
}
@Override
- public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers,
- LoadManagerContext context)
throws BrokerFilterException {
- int loadBalancerBrokerMaxTopics =
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
- brokers.keySet().removeIf(broker -> {
- Optional<BrokerLoadData> brokerLoadDataOpt =
context.brokerLoadDataStore().get(broker);
- long topics =
brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0);
- // TODO: The broker load data might be delayed, so the max topic
check might not accurate.
- return topics >= loadBalancerBrokerMaxTopics;
- });
- return brokers;
+ public void initialize(PulsarService pulsar) {
+ this.isolationPoliciesHelper = new IsolationPoliciesHelper(new
SimpleResourceAllocationPolicies(pulsar));
+ }
+
+ @Override
+ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
availableBrokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context)
+ throws BrokerFilterException {
+ Set<String> brokerCandidateCache =
+
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, serviceUnit);
+ availableBrokers.keySet().retainAll(brokerCandidateCache);
+ return availableBrokers;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index e3f8faca324..b98edd3d425 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -20,10 +20,12 @@ package
org.apache.pulsar.broker.loadbalance.extensions.filter;
import java.util.Map;
import java.util.Optional;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
public class BrokerMaxTopicCountFilter implements BrokerFilter {
@@ -34,8 +36,14 @@ public class BrokerMaxTopicCountFilter implements
BrokerFilter {
return FILTER_NAME;
}
+ @Override
+ public void initialize(PulsarService pulsar) {
+ // No-op
+ }
+
@Override
public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers,
+ ServiceUnitId serviceUnit,
LoadManagerContext context)
throws BrokerFilterException {
int loadBalancerBrokerMaxTopics =
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
brokers.keySet().removeIf(broker -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
index 869fb049a3c..b7332a5ff10 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -22,11 +22,13 @@ import com.github.zafarkhaja.semver.Version;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
/**
* Filter by broker version.
@@ -45,7 +47,9 @@ public class BrokerVersionFilter implements BrokerFilter {
*
*/
@Override
- public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers, LoadManagerContext context)
+ public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers,
+ ServiceUnitId serviceUnit,
+ LoadManagerContext context)
throws BrokerFilterException {
ServiceConfiguration conf = context.brokerConfiguration();
if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
@@ -144,4 +148,9 @@ public class BrokerVersionFilter implements BrokerFilter {
public String name() {
return FILTER_NAME;
}
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ // No-op
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
new file mode 100644
index 00000000000..4d7a5bf22d6
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+ private final SimpleResourceAllocationPolicies policies;
+
+ public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+ this.policies = policies;
+ }
+
+ private static final FastThreadLocal<Set<String>>
localBrokerCandidateCache = new FastThreadLocal<>() {
+ @Override
+ protected Set<String> initialValue() {
+ return new HashSet<>();
+ }
+ };
+
+ public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData>
availableBrokers,
+ ServiceUnitId serviceUnit) {
+ Set<String> brokerCandidateCache = localBrokerCandidateCache.get();
+ brokerCandidateCache.clear();
+ LoadManagerShared.applyNamespacePolicies(serviceUnit, policies,
brokerCandidateCache,
+ availableBrokers.keySet(), new
LoadManagerShared.BrokerTopicLoadingPredicate() {
+ @Override
+ public boolean isEnablePersistentTopics(String brokerUrl) {
+ BrokerLookupData lookupData =
availableBrokers.get(brokerUrl.replace("http://", ""));
+ return lookupData != null &&
lookupData.persistentTopicsEnabled();
+ }
+
+ @Override
+ public boolean isEnableNonPersistentTopics(String
brokerUrl) {
+ BrokerLookupData lookupData =
availableBrokers.get(brokerUrl.replace("http://", ""));
+ return lookupData != null &&
lookupData.nonPersistentTopicsEnabled();
+ }
+ });
+ return brokerCandidateCache;
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/package-info.java
new file mode 100644
index 00000000000..76e0c4a8942
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7f9128de817..810fda320af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -21,10 +21,15 @@ package
org.apache.pulsar.broker.loadbalance.extensions.scheduler;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
+import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
@@ -32,12 +37,15 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
@@ -71,6 +79,7 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
private final LoadStats stats = new LoadStats();
private final PulsarService pulsar;
private final SimpleResourceAllocationPolicies allocationPolicies;
+ private final IsolationPoliciesHelper isolationPoliciesHelper;
private final UnloadDecision decision = new UnloadDecision();
@@ -78,11 +87,13 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
public TransferShedder(){
this.pulsar = null;
this.allocationPolicies = null;
+ this.isolationPoliciesHelper = null;
}
public TransferShedder(PulsarService pulsar){
this.pulsar = pulsar;
this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
+ this.isolationPoliciesHelper = new
IsolationPoliciesHelper(allocationPolicies);
}
@@ -265,6 +276,17 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
boolean transfer = conf.isLoadBalancerTransferEnabled();
+
+ Map<String, BrokerLookupData> availableBrokers;
+ try {
+ availableBrokers =
context.brokerRegistry().getAvailableBrokerLookupDataAsync()
+
.get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ } catch (ExecutionException | InterruptedException |
TimeoutException e) {
+ decision.skip(Unknown);
+ log.warn("Failed to fetch available brokers. Reason:{}. Stop
unloading.", decision.getReason(), e);
+ return decision;
+ }
+
while (true) {
if (!stats.hasTransferableBrokers()) {
if (debugMode) {
@@ -334,7 +356,9 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
int remainingTopBundles = topBundlesLoadData.size();
for (var e : topBundlesLoadData) {
String bundle = e.bundleName();
- if (!recentlyUnloadedBundles.containsKey(bundle) &&
isTransferable(bundle)) {
+ if (!recentlyUnloadedBundles.containsKey(bundle)
+ && isTransferable(context, availableBrokers,
+ bundle, maxBroker, Optional.of(minBroker))) {
var bundleData = e.stats();
double throughput = bundleData.msgThroughputIn +
bundleData.msgThroughputOut;
if (remainingTopBundles > 1
@@ -342,8 +366,7 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
|| !atLeastOneBundleSelected)) {
if (transfer) {
selectedBundlesCache.put(maxBroker,
- new Unload(maxBroker, bundle,
- Optional.of(minBroker)));
+ new Unload(maxBroker, bundle,
Optional.of(minBroker)));
} else {
selectedBundlesCache.put(maxBroker,
new Unload(maxBroker, bundle));
@@ -412,18 +435,27 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
}
- private boolean isTransferable(String bundle) {
+ private boolean isTransferable(LoadManagerContext context,
+ Map<String, BrokerLookupData>
availableBrokers,
+ String bundle,
+ String maxBroker,
+ Optional<String> broker) {
if (pulsar == null || allocationPolicies == null) {
return true;
}
- NamespaceName namespace =
NamespaceName.get(LoadManagerShared.getNamespaceNameFromBundleName(bundle));
- if (allocationPolicies.areIsolationPoliciesPresent(namespace)) {
+ String namespace =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+ NamespaceName namespaceName = NamespaceName.get(namespace);
+ final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+ NamespaceBundle namespaceBundle =
+
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace,
bundleRange);
+
+ if (!canTransferWithIsolationPoliciesToBroker(context,
availableBrokers, namespaceBundle, maxBroker, broker)) {
return false;
}
try {
var localPoliciesOptional = pulsar
-
.getPulsarResources().getLocalPolicies().getLocalPolicies(namespace);
+
.getPulsarResources().getLocalPolicies().getLocalPolicies(namespaceName);
if (localPoliciesOptional.isPresent() && StringUtils.isNotBlank(
localPoliciesOptional.get().namespaceAntiAffinityGroup)) {
return false;
@@ -434,4 +466,37 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
}
return true;
}
+
+ /**
+ * Check the gave bundle and broker can be transfer or unload with
isolation policies applied.
+ *
+ * @param context The load manager context.
+ * @param availableBrokers The available brokers.
+ * @param namespaceBundle The bundle try to unload or transfer.
+ * @param currentBroker The current broker.
+ * @param targetBroker The broker will be transfer to.
+ * @return Can be transfer/unload or not.
+ */
+ private boolean
canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+ Map<String,
BrokerLookupData> availableBrokers,
+ NamespaceBundle
namespaceBundle,
+ String
currentBroker,
+ Optional<String>
targetBroker) {
+ if (isolationPoliciesHelper == null
+ ||
!allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject()))
{
+ return true;
+ }
+ boolean transfer =
context.brokerConfiguration().isLoadBalancerTransferEnabled();
+ Set<String> candidates =
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers,
namespaceBundle);
+
+ // Remove the current bundle owner broker.
+ candidates.remove(currentBroker);
+
+ // Unload: Check if there are any more candidates available for
selection.
+ if (targetBroker.isEmpty() || !transfer) {
+ return !candidates.isEmpty();
+ }
+ // Transfer: Check if this broker is among the candidates.
+ return candidates.contains(targetBroker.get());
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index ec82f5c383e..441415a9d35 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
@@ -290,12 +291,19 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
return "Mock broker filter";
}
+ @Override
+ public void initialize(PulsarService pulsar) {
+ // No-op
+ }
+
@Override
public Map<String, BrokerLookupData> filter(Map<String,
BrokerLookupData> brokers,
- LoadManagerContext
context) {
+ ServiceUnitId
serviceUnit,
+ LoadManagerContext
context) throws BrokerFilterException {
brokers.remove(pulsar1.getLookupServiceAddress());
return brokers;
}
+
})).when(primaryLoadManager).getBrokerFilterPipeline();
Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
@@ -308,14 +316,10 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
TopicName topicName = TopicName.get("test-filter-has-exception");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
- doReturn(List.of(new BrokerFilter() {
- @Override
- public String name() {
- return "Mock broker filter";
- }
-
+ doReturn(List.of(new MockBrokerFilter() {
@Override
public Map<String, BrokerLookupData> filter(Map<String,
BrokerLookupData> brokers,
+ ServiceUnitId
serviceUnit,
LoadManagerContext
context) throws BrokerFilterException {
brokers.clear();
throw new BrokerFilterException("Test");
@@ -379,6 +383,35 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
}
+ @Test
+ public void testMoreThenOneFilter() throws Exception {
+ TopicName topicName = TopicName.get("test-filter-has-exception");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+ String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
+ doReturn(List.of(new MockBrokerFilter() {
+ @Override
+ public Map<String, BrokerLookupData> filter(Map<String,
BrokerLookupData> brokers,
+ ServiceUnitId
serviceUnit,
+ LoadManagerContext
context) throws BrokerFilterException {
+ brokers.remove(lookupServiceAddress1);
+ return brokers;
+ }
+ },new MockBrokerFilter() {
+ @Override
+ public Map<String, BrokerLookupData> filter(Map<String,
BrokerLookupData> brokers,
+ ServiceUnitId
serviceUnit,
+ LoadManagerContext
context) throws BrokerFilterException {
+ brokers.clear();
+ throw new BrokerFilterException("Test");
+ }
+ })).when(primaryLoadManager).getBrokerFilterPipeline();
+
+ Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
+ assertTrue(brokerLookupData.isPresent());
+ assertEquals(brokerLookupData.get().getWebServiceUrl(),
pulsar2.getWebServiceAddress());
+ }
+
@Test
public void testGetMetrics() throws Exception {
{
@@ -565,6 +598,20 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
assertEquals(actual, expected);
}
+ private static abstract class MockBrokerFilter implements BrokerFilter {
+
+ @Override
+ public String name() {
+ return "Mock-broker-filter";
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ // No-op
+ }
+
+ }
+
private static void cleanTableView(ServiceUnitStateChannel channel)
throws IllegalAccessException {
var tv = (TableViewImpl<ServiceUnitStateData>)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
new file mode 100644
index 00000000000..a079a23bcea
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
+import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link BrokerIsolationPoliciesFilter}.
+ */
+@Test(groups = "broker")
+public class BrokerIsolationPoliciesFilterTest {
+
+ /**
+ * It verifies namespace-isolation policies with primary and secondary
brokers.
+ *
+ * usecase:
+ *
+ * <pre>
+ * 1. Namespace: primary=broker1, secondary=broker2, shared=broker3,
min_limit = 1
+ * a. available-brokers: broker1, broker2, broker3 => result: broker1
+ * b. available-brokers: broker2, broker3 => result: broker2
+ * c. available-brokers: broker3 => result: NULL
+ * 2. Namespace: primary=broker1, secondary=broker2, shared=broker3,
min_limit = 2
+ * a. available-brokers: broker1, broker2, broker3 => result: broker1,
broker2
+ * b. available-brokers: broker2, broker3 => result: broker2
+ * c. available-brokers: broker3 => result: NULL
+ * </pre>
+ */
+ @Test
+ public void
testFilterWithNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers()
+ throws IllegalAccessException, BrokerFilterException {
+ var namespace = "my-tenant/my-ns";
+ NamespaceName namespaceName = NamespaceName.get(namespace);
+
+ BrokerIsolationPoliciesFilter filter = new
BrokerIsolationPoliciesFilter();
+ var policies = mock(SimpleResourceAllocationPolicies.class);
+
+ // 1. Namespace: primary=broker1, secondary=broker2, shared=broker3,
min_limit = 1
+ setIsolationPolicies(policies, namespaceName, Set.of("broker1"),
Set.of("broker2"), Set.of("broker3"), 1);
+ IsolationPoliciesHelper isolationPoliciesHelper = new
IsolationPoliciesHelper(policies);
+ FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper",
isolationPoliciesHelper, true);
+
+ // a. available-brokers: broker1, broker2, broker3 => result: broker1
+ Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(Map.of(
+ "broker1", getLookupData(),
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceName, getContext());
+ assertEquals(result.keySet(), Set.of("broker1"));
+
+ // b. available-brokers: broker2, broker3 => result: broker2
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceName, getContext());
+ assertEquals(result.keySet(), Set.of("broker2"));
+
+ // c. available-brokers: broker3 => result: NULL
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker3", getLookupData())), namespaceName, getContext());
+ assertTrue(result.isEmpty());
+
+ // 2. Namespace: primary=broker1, secondary=broker2, shared=broker3,
min_limit = 2
+ setIsolationPolicies(policies, namespaceName, Set.of("broker1"),
Set.of("broker2"), Set.of("broker3"), 2);
+
+ // a. available-brokers: broker1, broker2, broker3 => result: broker1,
broker2
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker1", getLookupData(),
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceName, getContext());
+ assertEquals(result.keySet(), Set.of("broker1", "broker2"));
+
+ // b. available-brokers: broker2, broker3 => result: broker2
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceName, getContext());
+ assertEquals(result.keySet(), Set.of("broker2"));
+
+ // c. available-brokers: broker3 => result: NULL
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker3", getLookupData())), namespaceName, getContext());
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testFilterWithPersistentOrNonPersistentDisabled()
+ throws IllegalAccessException, BrokerFilterException {
+ var namespace = "my-tenant/my-ns";
+ NamespaceName namespaceName = NamespaceName.get(namespace);
+ NamespaceBundle namespaceBundle = mock(NamespaceBundle.class);
+ doReturn(true).when(namespaceBundle).hasNonPersistentTopic();
+ doReturn(namespaceName).when(namespaceBundle).getNamespaceObject();
+
+ BrokerIsolationPoliciesFilter filter = new
BrokerIsolationPoliciesFilter();
+
+ var policies = mock(SimpleResourceAllocationPolicies.class);
+
doReturn(false).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
+ doReturn(true).when(policies).isSharedBroker(any());
+ IsolationPoliciesHelper isolationPoliciesHelper = new
IsolationPoliciesHelper(policies);
+ FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper",
isolationPoliciesHelper, true);
+
+ Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(Map.of(
+ "broker1", getLookupData(),
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceBundle, getContext());
+ assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+
+
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker1", getLookupData(true, false),
+ "broker2", getLookupData(true, false),
+ "broker3", getLookupData())), namespaceBundle, getContext());
+ assertEquals(result.keySet(), Set.of("broker3"));
+
+ doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
+
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker1", getLookupData(),
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceBundle, getContext());
+ assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+
+ result = filter.filter(new HashMap<>(Map.of(
+ "broker1", getLookupData(false, true),
+ "broker2", getLookupData(),
+ "broker3", getLookupData())), namespaceBundle, getContext());
+ assertEquals(result.keySet(), Set.of("broker2", "broker3"));
+ }
+
+ private void setIsolationPolicies(SimpleResourceAllocationPolicies
policies,
+ NamespaceName namespaceName,
+ Set<String> primary,
+ Set<String> secondary,
+ Set<String> shared,
+ int min_limit) {
+ reset(policies);
+
doReturn(true).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
+ doReturn(false).when(policies).isPrimaryBroker(eq(namespaceName),
any());
+ doReturn(false).when(policies).isSecondaryBroker(eq(namespaceName),
any());
+ doReturn(false).when(policies).isSharedBroker(any());
+
+ primary.forEach(broker -> {
+ doReturn(true).when(policies).isPrimaryBroker(eq(namespaceName),
eq(broker));
+ });
+
+ secondary.forEach(broker -> {
+ doReturn(true).when(policies).isSecondaryBroker(eq(namespaceName),
eq(broker));
+ });
+
+ shared.forEach(broker -> {
+ doReturn(true).when(policies).isSharedBroker(eq(broker));
+ });
+
+ doAnswer(invocationOnMock -> {
+ Integer totalPrimaryCandidates = invocationOnMock.getArgument(1,
Integer.class);
+ return totalPrimaryCandidates < min_limit;
+ }).when(policies).shouldFailoverToSecondaries(eq(namespaceName),
anyInt());
+ }
+
+ public BrokerLookupData getLookupData() {
+ return getLookupData(true, true);
+ }
+
+ public BrokerLookupData getLookupData(boolean persistentTopicsEnabled,
+ boolean nonPersistentTopicsEnabled) {
+ String webServiceUrl = "http://localhost:8080";
+ String webServiceUrlTls = "https://localhoss:8081";
+ String pulsarServiceUrl = "pulsar://localhost:6650";
+ String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+ Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+ Map<String, String> protocols = new HashMap<>(){{
+ put("kafka", "9092");
+ }};
+ return new BrokerLookupData(
+ webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+ pulsarServiceUrlTls, advertisedListeners, protocols,
+ persistentTopicsEnabled, nonPersistentTopicsEnabled, "3.0.0");
+ }
+
+ public LoadManagerContext getContext() {
+ LoadManagerContext mockContext = mock(LoadManagerContext.class);
+ doReturn(new
ServiceConfiguration()).when(mockContext).brokerConfiguration();
+ return mockContext;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
index 4c3255341b7..da13a9526a8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
@@ -58,7 +58,7 @@ public class BrokerMaxTopicCountFilterTest extends
BrokerFilterTestBase {
"broker3", getLookupData(),
"broker4", getLookupData()
);
- Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(originalBrokers), context);
+ Map<String, BrokerLookupData> result = filter.filter(new
HashMap<>(originalBrokers), null, context);
assertEquals(result, Map.of(
"broker2", getLookupData(),
"broker4", getLookupData()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
index d36c79d60ed..cafd8f0ea7a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -41,7 +41,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
@Test
public void testFilterEmptyBrokerList() throws BrokerFilterException {
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new
HashMap<>(), getContext());
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new
HashMap<>(), null, getContext());
assertTrue(result.isEmpty());
}
@@ -58,7 +58,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
);
Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result =
brokerVersionFilter.filter(brokers, context);
+ Map<String, BrokerLookupData> result =
brokerVersionFilter.filter(brokers, null, context);
assertEquals(result, originalBrokers);
}
@@ -71,7 +71,8 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6653", getLookupData("2.10.1")
);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new
HashMap<>(originalBrokers), getContext());
+ Map<String, BrokerLookupData> result = brokerVersionFilter.filter(
+ new HashMap<>(originalBrokers), null, getContext());
assertEquals(result, Map.of(
"localhost:6651", getLookupData("2.10.1"),
"localhost:6652", getLookupData("2.10.1"),
@@ -84,7 +85,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6652", getLookupData("2.10.1"),
"localhost:6653", getLookupData("2.10.1")
);
- result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
getContext());
+ result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
null, getContext());
assertEquals(result, Map.of(
"localhost:6652", getLookupData("2.10.1"),
@@ -98,7 +99,7 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6653", getLookupData("2.10.2-SNAPSHOT")
);
- result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
getContext());
+ result = brokerVersionFilter.filter(new HashMap<>(originalBrokers),
null, getContext());
assertEquals(result, Map.of(
"localhost:6653", getLookupData("2.10.2-SNAPSHOT")
));
@@ -111,6 +112,6 @@ public class BrokerVersionFilterTest extends
BrokerFilterTestBase {
"localhost:6650", getLookupData("xxx")
);
BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
- brokerVersionFilter.filter(new HashMap<>(originalBrokers),
getContext());
+ brokerVersionFilter.filter(new HashMap<>(originalBrokers), null,
getContext());
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 709a1113f35..a668d85f007 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -28,9 +28,15 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
+import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -42,24 +48,37 @@ import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.commons.math3.stat.descriptive.moment.Mean;
import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -145,6 +164,19 @@ public class TransferShedderTest {
return topKBundles.getLoadData();
}
+ public TopBundlesLoadData getTopBundlesLoadWithOutSuffix(String namespace,
+ int load1,
+ int load2) {
+ var namespaceBundleStats1 = new NamespaceBundleStats();
+ namespaceBundleStats1.msgThroughputOut = load1 * 1e6;
+ var namespaceBundleStats2 = new NamespaceBundleStats();
+ namespaceBundleStats2.msgThroughputOut = load2 * 1e6;
+ var topKBundles = new TopKBundles();
+ topKBundles.update(Map.of(namespace + "/0x00000000_0x7FFFFFF",
namespaceBundleStats1,
+ namespace + "/0x7FFFFFF_0xFFFFFFF", namespaceBundleStats2), 2);
+ return topKBundles.getLoadData();
+ }
+
public LoadManagerContext getContext(){
var ctx = mock(LoadManagerContext.class);
var conf = new ServiceConfiguration();
@@ -236,6 +268,10 @@ public class TransferShedderTest {
doReturn(conf).when(ctx).brokerConfiguration();
doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore();
doReturn(topBundleLoadDataStore).when(ctx).topBundleLoadDataStore();
+ var brokerRegister = mock(BrokerRegistry.class);
+ doReturn(brokerRegister).when(ctx).brokerRegistry();
+ BrokerRegistry registry = ctx.brokerRegistry();
+
doReturn(CompletableFuture.completedFuture(Map.of())).when(registry).getAvailableBrokerLookupDataAsync();
return ctx;
}
@@ -350,33 +386,187 @@ public class TransferShedderTest {
}
@Test
- public void testBundlesWithIsolationPolicies() throws
IllegalAccessException {
- var pulsar = mock(PulsarService.class);
+ public void testGetAvailableBrokersFailed() {
+ TransferShedder transferShedder = new TransferShedder();
+ var ctx = setupContext();
+ BrokerRegistry registry = ctx.brokerRegistry();
+ doReturn(FutureUtil.failedFuture(new
TimeoutException())).when(registry).getAvailableBrokerLookupDataAsync();
+ var res = transferShedder.findBundlesForUnloading(ctx, Map.of(),
Map.of());
+
+ var expected = new UnloadDecision();
+ expected.setLabel(Skip);
+ expected.skip(Unknown);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testBundlesWithIsolationPolicies() throws
IllegalAccessException, MetadataStoreException {
+ var pulsar = getMockPulsar();
+
TransferShedder transferShedder = new TransferShedder(pulsar);
+
+ var pulsarResourcesMock = mock(PulsarResources.class);
+ var localPoliciesResourcesMock = mock(LocalPoliciesResources.class);
+ doReturn(pulsarResourcesMock).when(pulsar).getPulsarResources();
+
doReturn(localPoliciesResourcesMock).when(pulsarResourcesMock).getLocalPolicies();
+
doReturn(Optional.empty()).when(localPoliciesResourcesMock).getLocalPolicies(any());
+
var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
spy(FieldUtils.readDeclaredField(transferShedder,
"allocationPolicies", true));
-
doReturn(true).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any());
FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies",
allocationPoliciesSpy, true);
+ IsolationPoliciesHelper isolationPoliciesHelper = new
IsolationPoliciesHelper(allocationPoliciesSpy);
+ FieldUtils.writeDeclaredField(transferShedder,
"isolationPoliciesHelper", isolationPoliciesHelper, true);
+
+ // Test transfer to a has isolation policies broker.
+ setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE",
+ Set.of("broker5"), Set.of(), Set.of(), 1);
var ctx = setupContext();
+ BrokerRegistry registry = ctx.brokerRegistry();
+ doReturn(CompletableFuture.completedFuture(Map.of(
+ "broker1", getLookupData(),
+ "broker2", getLookupData(),
+ "broker3", getLookupData(),
+ "broker4", getLookupData(),
+ "broker5",
getLookupData()))).when(registry).getAvailableBrokerLookupDataAsync();
+
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1",
getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
- topBundlesLoadDataStore.pushAsync("broker2",
getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
- topBundlesLoadDataStore.pushAsync("broker3",
getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
- topBundlesLoadDataStore.pushAsync("broker4",
getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
- topBundlesLoadDataStore.pushAsync("broker5",
getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+ topBundlesLoadDataStore.pushAsync("broker1",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceA", 1,
3));
+ topBundlesLoadDataStore.pushAsync("broker2",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceB", 2,
8));
+ topBundlesLoadDataStore.pushAsync("broker3",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceC", 6,
10));
+ topBundlesLoadDataStore.pushAsync("broker4",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceD", 10,
20));
+ topBundlesLoadDataStore.pushAsync("broker5",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceE", 70,
20));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(),
Map.of());
var expected = new UnloadDecision();
- expected.setLabel(Skip);
- expected.skip(NoBundles);
+ var unloads = expected.getUnloads();
+ unloads.put("broker4",
+ new Unload("broker4",
"my-tenant/my-namespaceD/0x7FFFFFF_0xFFFFFFF", Optional.of("broker2")));
+ expected.setLabel(Success);
+ expected.setReason(Overloaded);
+ expected.setLoadAvg(setupLoadAvg);
+ expected.setLoadStd(setupLoadStd);
+ assertEquals(res, expected);
+
+ // Test unload a has isolation policies broker.
+
+ setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE",
+ Set.of("broker5"), Set.of(), Set.of(), 1);
+ ctx = setupContext();
+ registry = ctx.brokerRegistry();
+ doReturn(CompletableFuture.completedFuture(Map.of(
+ "broker1", getLookupData(),
+ "broker2", getLookupData(),
+ "broker3", getLookupData(),
+ "broker4", getLookupData(),
+ "broker5",
getLookupData()))).when(registry).getAvailableBrokerLookupDataAsync();
+
+ ctx.brokerConfiguration().setLoadBalancerTransferEnabled(false);
+
+ topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+ topBundlesLoadDataStore.pushAsync("broker1",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceA", 1,
3));
+ topBundlesLoadDataStore.pushAsync("broker2",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceB", 2,
8));
+ topBundlesLoadDataStore.pushAsync("broker3",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceC", 6,
10));
+ topBundlesLoadDataStore.pushAsync("broker4",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceD", 10,
20));
+ topBundlesLoadDataStore.pushAsync("broker5",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceE", 70,
20));
+ res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+ expected = new UnloadDecision();
+ unloads = expected.getUnloads();
+ unloads.put("broker4",
+ new Unload("broker4",
"my-tenant/my-namespaceD/0x7FFFFFF_0xFFFFFFF", Optional.empty()));
+ expected.setLabel(Success);
+ expected.setReason(Overloaded);
expected.setLoadAvg(setupLoadAvg);
expected.setLoadStd(setupLoadStd);
assertEquals(res, expected);
}
+ public BrokerLookupData getLookupData() {
+ String webServiceUrl = "http://localhost:8080";
+ String webServiceUrlTls = "https://localhoss:8081";
+ String pulsarServiceUrl = "pulsar://localhost:6650";
+ String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+ Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+ Map<String, String> protocols = new HashMap<>(){{
+ put("kafka", "9092");
+ }};
+ return new BrokerLookupData(
+ webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+ pulsarServiceUrlTls, advertisedListeners, protocols,
+ true, true, "3.0.0");
+ }
+
+ private void setIsolationPolicies(SimpleResourceAllocationPolicies
policies,
+ String namespace,
+ Set<String> primary,
+ Set<String> secondary,
+ Set<String> shared,
+ int min_limit) {
+ reset(policies);
+ NamespaceName namespaceName = NamespaceName.get(namespace);
+ NamespaceBundle namespaceBundle = mock(NamespaceBundle.class);
+ doReturn(true).when(namespaceBundle).hasNonPersistentTopic();
+ doReturn(namespaceName).when(namespaceBundle).getNamespaceObject();
+ doReturn(false).when(policies).areIsolationPoliciesPresent(any());
+ doReturn(false).when(policies).isPrimaryBroker(any(), any());
+ doReturn(false).when(policies).isSecondaryBroker(any(), any());
+ doReturn(true).when(policies).isSharedBroker(any());
+
+
doReturn(true).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
+
+ primary.forEach(broker -> {
+ doReturn(true).when(policies).isPrimaryBroker(eq(namespaceName),
eq(broker));
+ });
+
+ secondary.forEach(broker -> {
+ doReturn(true).when(policies).isSecondaryBroker(eq(namespaceName),
eq(broker));
+ });
+
+ shared.forEach(broker -> {
+ doReturn(true).when(policies).isSharedBroker(eq(broker));
+ });
+
+ doAnswer(invocationOnMock -> {
+ Integer totalPrimaryCandidates = invocationOnMock.getArgument(1,
Integer.class);
+ return totalPrimaryCandidates < min_limit;
+ }).when(policies).shouldFailoverToSecondaries(eq(namespaceName),
anyInt());
+ }
+
+ private PulsarService getMockPulsar() {
+ var pulsar = mock(PulsarService.class);
+ var namespaceService = mock(NamespaceService.class);
+ doReturn(namespaceService).when(pulsar).getNamespaceService();
+ NamespaceBundleFactory factory = mock(NamespaceBundleFactory.class);
+ doReturn(factory).when(namespaceService).getNamespaceBundleFactory();
+ doAnswer(answer -> {
+ String namespace = answer.getArgument(0, String.class);
+ String bundleRange = answer.getArgument(1, String.class);
+ String[] boundaries = bundleRange.split("_");
+ Long lowerEndpoint = Long.decode(boundaries[0]);
+ Long upperEndpoint = Long.decode(boundaries[1]);
+ Range<Long> hashRange = Range.range(lowerEndpoint,
BoundType.CLOSED, upperEndpoint,
+ (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND))
? BoundType.CLOSED : BoundType.OPEN);
+ return new NamespaceBundle(NamespaceName.get(namespace),
hashRange, factory);
+ }).when(factory).getBundle(anyString(), anyString());
+ return pulsar;
+ }
+
+
@Test
public void testBundlesWithAntiAffinityGroup() throws
IllegalAccessException, MetadataStoreException {
- var pulsar = mock(PulsarService.class);
+ var pulsar = getMockPulsar();
TransferShedder transferShedder = new TransferShedder(pulsar);
var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
spy(FieldUtils.readDeclaredField(transferShedder,
"allocationPolicies", true));
@@ -392,11 +582,16 @@ public class TransferShedderTest {
var ctx = setupContext();
var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
- topBundlesLoadDataStore.pushAsync("broker1",
getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
- topBundlesLoadDataStore.pushAsync("broker2",
getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
- topBundlesLoadDataStore.pushAsync("broker3",
getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
- topBundlesLoadDataStore.pushAsync("broker4",
getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
- topBundlesLoadDataStore.pushAsync("broker5",
getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+ topBundlesLoadDataStore.pushAsync("broker1",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceA", 1,
3));
+ topBundlesLoadDataStore.pushAsync("broker2",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceB", 2,
8));
+ topBundlesLoadDataStore.pushAsync("broker3",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceC", 6,
10));
+ topBundlesLoadDataStore.pushAsync("broker4",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceD", 10,
20));
+ topBundlesLoadDataStore.pushAsync("broker5",
+ getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceE", 70,
20));
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(),
Map.of());
var expected = new UnloadDecision();