This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c3126c684a9 [improve][broker] PIP-192: Implement extensible load
manager (#19102)
c3126c684a9 is described below
commit c3126c684a91f2c677fbe2eb0e35037d77c4b116
Author: Kai Wang <[email protected]>
AuthorDate: Fri Feb 3 15:05:06 2023 +0800
[improve][broker] PIP-192: Implement extensible load manager (#19102)
PIP: #16691
### Motivation
Implement extensible load manager.
### Modifications
For the PIP-192, this PR adds `ExtensibleLoadManagerImpl` and unit tests.
This PR also changes:
1. Added `CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
Optional<ServiceUnitId> topic, ServiceUnitId bundle)` to
`LoadManager ` interface.
2. Added `CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle)` to
`LoadManager` interface.
3. Change `CompletableFuture<String> getOwnerAsync(String serviceUnit)` to
`CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit)` to
unify the result.
---
.../org/apache/pulsar/broker/PulsarService.java | 26 +-
.../pulsar/broker/loadbalance/LoadManager.java | 18 ++
.../extensions/ExtensibleLoadManager.java | 9 +
.../extensions/ExtensibleLoadManagerImpl.java | 266 +++++++++++++++++++
.../extensions/ExtensibleLoadManagerWrapper.java | 146 +++++++++++
.../extensions/LoadManagerContextImpl.java | 60 +++++
.../channel/ServiceUnitStateChannel.java | 4 +-
.../channel/ServiceUnitStateChannelImpl.java | 13 +-
.../extensions/filter/BrokerFilter.java | 6 +-
.../strategy/BrokerSelectionStrategy.java | 4 +-
.../strategy/LeastResourceUsageWithWeight.java | 4 +-
.../extensions/strategy/package-info.java | 2 +-
.../pulsar/broker/namespace/NamespaceService.java | 51 +++-
.../extensions/ExtensibleLoadManagerImplTest.java | 287 +++++++++++++++++++++
.../channel/ServiceUnitStateChannelTest.java | 68 +++--
.../strategy/LeastResourceUsageWithWeightTest.java | 11 +-
16 files changed, 905 insertions(+), 70 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0164b51d2c2..ae0fb1a9f28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
@@ -818,6 +819,17 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.webSocketService.setLocalCluster(clusterData);
}
+ // Start the leader election service
+ startLeaderElectionService();
+
+ // By starting the Load manager service, the broker will also
become visible
+ // to the rest of the broker by creating the registration z-node.
This needs
+ // to be done only when the broker is fully operative.
+ //
+ // The load manager service and its service unit state channel
need to be initialized first
+ // (namespace service depends on load manager)
+ this.startLoadManagementService();
+
// Initialize namespace service, after service url assigned.
Should init zk and refresh self owner info.
this.nsService.initialize();
@@ -828,9 +840,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.topicPoliciesService.start();
- // Start the leader election service
- startLeaderElectionService();
-
// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();
@@ -859,11 +868,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.metricsGenerator = new MetricsGenerator(this);
- // By starting the Load manager service, the broker will also
become visible
- // to the rest of the broker by creating the registration z-node.
This needs
- // to be done only when the broker is fully operative.
- this.startLoadManagementService();
-
// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
// so that the protocol handlers can access broker service
properly.
@@ -1103,6 +1107,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
protected void startLeaderElectionService() {
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ LOG.info("The load manager extension is enabled. Skipping
PulsarService LeaderElectionService.");
+ return;
+ }
this.leaderElectionService = new
LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
state -> {
if (state == LeaderElectionState.Leading) {
@@ -1207,7 +1215,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
LOG.info("Starting load management service ...");
this.loadManager.get().start();
- if (config.isLoadBalancerEnabled()) {
+ if (config.isLoadBalancerEnabled() &&
!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval =
config.getLoadBalancerReportUpdateMinIntervalMillis();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index b4df5d31968..17bff57b85c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -25,8 +25,12 @@ import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.Reflections;
@@ -58,6 +62,15 @@ public interface LoadManager {
*/
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
+ default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ throw new UnsupportedOperationException();
+ }
+
+ default CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Generate the load report.
*/
@@ -145,6 +158,11 @@ public interface LoadManager {
final LoadManager casted = new
ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
casted.initialize(pulsar);
return casted;
+ } else if (loadManagerInstance instanceof ExtensibleLoadManager) {
+ final LoadManager casted =
+ new
ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
+ casted.initialize(pulsar);
+ return casted;
}
} catch (Exception e) {
LOG.warn("Error when trying to create load manager: ", e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
index bb66bf731f4..b7da70d1cf1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
@@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable {
*/
CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
+ /**
+ * Check the incoming service unit is owned by the current broker.
+ *
+ * @param topic The optional topic, some method won't provide topic var in
this param.
+ * @param serviceUnit The service unit (e.g. bundle).
+ * @return The broker lookup data.
+ */
+ CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId>
topic, ServiceUnitId serviceUnit);
+
/**
* Close the load manager.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
new file mode 100644
index 00000000000..d95bacd157e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
+import
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+ public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get(
+ TopicDomain.non_persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE,
+ "loadbalancer-broker-load-data").toString();
+
+ public static final String TOP_BUNDLES_LOAD_DATA_STORE_TOPIC =
TopicName.get(
+ TopicDomain.non_persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE,
+ "loadbalancer-top-bundles-load-data").toString();
+
+ private PulsarService pulsar;
+
+ private ServiceConfiguration conf;
+
+ @Getter
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceUnitStateChannel serviceUnitStateChannel;
+
+ private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
+ private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
+
+ @Getter
+ private LoadManagerContext context;
+
+ @Getter
+ private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+ @Getter
+ private final List<BrokerFilter> brokerFilterPipeline;
+
+ private boolean started = false;
+
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<BrokerLookupData>>>
+ lookupRequests = ConcurrentOpenHashMap.<String,
+ CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+ .build();
+
+ /**
+ * Life cycle: Constructor -> initialize -> start -> close.
+ */
+ public ExtensibleLoadManagerImpl() {
+ this.brokerFilterPipeline = new ArrayList<>();
+ // TODO: Make brokerSelectionStrategy configurable.
+ this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
+ }
+
+ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration
conf) {
+ return
ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ if (this.started) {
+ return;
+ }
+ this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+ this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+ this.brokerRegistry.start();
+ this.serviceUnitStateChannel.start();
+
+ try {
+ this.brokerLoadDataStore = LoadDataStoreFactory
+ .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC,
BrokerLoadData.class);
+ this.topBundlesLoadDataStore = LoadDataStoreFactory
+ .create(pulsar.getClient(),
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+ } catch (LoadDataStoreException e) {
+ throw new PulsarServerException(e);
+ }
+
+ this.context = LoadManagerContextImpl.builder()
+ .configuration(conf)
+ .brokerRegistry(brokerRegistry)
+ .brokerLoadDataStore(brokerLoadDataStore)
+ .topBundleLoadDataStore(topBundlesLoadDataStore).build();
+ // TODO: Start load data reporter.
+
+ // TODO: Start unload scheduler and bundle split scheduler
+ this.started = true;
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.conf = pulsar.getConfiguration();
+ }
+
+ @Override
+ public CompletableFuture<Optional<BrokerLookupData>>
assign(Optional<ServiceUnitId> topic,
+ ServiceUnitId
serviceUnit) {
+
+ final String bundle = serviceUnit.toString();
+
+ CompletableFuture<Optional<BrokerLookupData>> future =
lookupRequests.computeIfAbsent(bundle, k -> {
+ final CompletableFuture<Optional<String>> owner;
+ // Assign the bundle to channel owner if is internal topic, to
avoid circular references.
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner =
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+ // If the bundle not assign yet, select and publish assign
event to channel.
+ if (broker.isEmpty()) {
+ return
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+ if (brokerOpt.isPresent()) {
+ log.info("Selected new owner broker: {} for
bundle: {}.", brokerOpt.get(), bundle);
+ return
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
+ .thenApply(Optional::of);
+ } else {
+ throw new IllegalStateException(
+ "Failed to select the new owner broker
for bundle: " + bundle);
+ }
+ });
+ }
+ // Already assigned, return it.
+ return CompletableFuture.completedFuture(broker);
+ });
+ }
+
+ return owner.thenCompose(broker -> {
+ if (broker.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(broker.get());
+ }).thenCompose(broker ->
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+ if (brokerLookupData.isEmpty()) {
+ String errorMsg = String.format(
+ "Failed to look up a broker registry:%s for
bundle:%s", broker, bundle);
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+ return CompletableFuture.completedFuture(brokerLookupData);
+ }));
+ });
+ future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+ return future;
+ }
+
+ private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId
bundle) {
+ BrokerRegistry brokerRegistry = getBrokerRegistry();
+ return brokerRegistry.getAvailableBrokerLookupDataAsync()
+ .thenCompose(availableBrokers -> {
+ // TODO: Support isolation policies
+ LoadManagerContext context = this.getContext();
+
+ Map<String, BrokerLookupData> availableBrokerCandidates =
new HashMap<>(availableBrokers);
+
+ // Filter out brokers that do not meet the rules.
+ List<BrokerFilter> filterPipeline =
getBrokerFilterPipeline();
+ for (final BrokerFilter filter : filterPipeline) {
+ try {
+ filter.filter(availableBrokerCandidates, context);
+ } catch (BrokerFilterException e) {
+ // TODO: We may need to revisit this error case.
+ log.error("Failed to filter out brokers.", e);
+ availableBrokerCandidates = availableBrokers;
+ }
+ }
+ if (availableBrokerCandidates.isEmpty()) {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ Set<String> candidateBrokers =
availableBrokerCandidates.keySet();
+
+ return CompletableFuture.completedFuture(
+
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
+ });
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
+ final String bundle = bundleUnit.toString();
+ CompletableFuture<Optional<String>> owner;
+ if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+ owner = serviceUnitStateChannel.getChannelOwnerAsync();
+ } else {
+ owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+ }
+
+ return owner.thenApply(broker ->
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+ }
+
+ @Override
+ public void close() throws PulsarServerException {
+ if (!this.started) {
+ return;
+ }
+ try {
+ this.brokerLoadDataStore.close();
+ this.topBundlesLoadDataStore.close();
+ } catch (IOException ex) {
+ throw new PulsarServerException(ex);
+ } finally {
+ try {
+ this.brokerRegistry.close();
+ } finally {
+ try {
+ this.serviceUnitStateChannel.close();
+ } finally {
+ this.started = false;
+ }
+ }
+ }
+ }
+
+ private boolean isInternalTopic(String topic) {
+ return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
+ || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
+ || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
new file mode 100644
index 00000000000..6d5797eed66
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+ private PulsarService pulsar;
+
+ private final ExtensibleLoadManagerImpl loadManager;
+
+ public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager)
{
+ this.loadManager = loadManager;
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ loadManager.start();
+ }
+
+ @Override
+ public void initialize(PulsarService pulsar) {
+ loadManager.initialize(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public boolean isCentralized() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+ Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.assign(topic, bundle)
+ .thenApply(lookupData ->
lookupData.map(BrokerLookupData::toLookupResult));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+ return loadManager.checkOwnershipAsync(topic, bundle);
+ }
+
+ @Override
+ public void disableBroker() throws Exception {
+ this.loadManager.getBrokerRegistry().unregister();
+ }
+
+ @Override
+ public Set<String> getAvailableBrokers() throws Exception {
+ return getAvailableBrokersAsync()
+
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+ return
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+ }
+
+ @Override
+ public String setNamespaceBundleAffinity(String bundle, String broker) {
+ // TODO: Add namespace bundle affinity support.
+ return null;
+ }
+
+ @Override
+ public void stop() throws PulsarServerException {
+ this.loadManager.close();
+ }
+
+
+ @Override
+ public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws
Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LoadManagerReport generateLoadReport() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setLoadReportForceUpdateFlag() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeLoadReportOnZookeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeResourceQuotasToZooKeeper() throws Exception {
+ // No-op, this operation is not useful, the load data reporter will
automatically write.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<Metrics> getLoadBalancingMetrics() {
+ // TODO: Add metrics.
+ return null;
+ }
+
+ @Override
+ public void doLoadShedding() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doNamespaceBundleSplit() {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java
new file mode 100644
index 00000000000..5f78b88e22c
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import lombok.Builder;
+import lombok.Setter;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+
+@Setter
+@Builder
+public class LoadManagerContextImpl implements LoadManagerContext {
+
+ private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
+
+ private LoadDataStore<TopBundlesLoadData> topBundleLoadDataStore;
+
+ private BrokerRegistry brokerRegistry;
+
+ private ServiceConfiguration configuration;
+
+
+ @Override
+ public ServiceConfiguration brokerConfiguration() {
+ return this.configuration;
+ }
+
+ @Override
+ public LoadDataStore<BrokerLoadData> brokerLoadDataStore() {
+ return this.brokerLoadDataStore;
+ }
+
+ @Override
+ public LoadDataStore<TopBundlesLoadData> topBundleLoadDataStore() {
+ return this.topBundleLoadDataStore;
+ }
+
+ @Override
+ public BrokerRegistry brokerRegistry() {
+ return this.brokerRegistry;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 238c433e177..fece425e75f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -116,9 +116,9 @@ public interface ServiceUnitStateChannel extends Closeable {
* the future object will complete and return the owner
broker.
* Sub-case2: If the assigned broker does not take the ownership in
time,
* the future object will time out.
- * Case 3: If none of them, it returns null.
+ * Case 3: If none of them, it returns Optional.empty().
*/
- CompletableFuture<String> getOwnerAsync(String serviceUnit);
+ CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit);
/**
* Asynchronously publishes the service unit assignment event to the
system topic in this channel.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 38e8afa50f3..37dfe6090bb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -63,6 +63,7 @@ import
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -276,21 +277,23 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
- public CompletableFuture<String> getOwnerAsync(String serviceUnit) {
+ public CompletableFuture<Optional<String>> getOwnerAsync(String
serviceUnit) {
validateChannelState(Started, true);
ServiceUnitStateData data = tableview.get(serviceUnit);
if (data == null) {
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(Optional.empty());
}
switch (data.state()) {
case Owned, Splitting -> {
- return CompletableFuture.completedFuture(data.broker());
+ return
CompletableFuture.completedFuture(Optional.of(data.broker()));
}
case Assigned, Released -> {
- return deferGetOwnerRequest(serviceUnit);
+ return
deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
}
default -> {
- return null;
+ String errorMsg = String.format("Failed to process service
unit state data: %s when get owner.", data);
+ log.error(errorMsg);
+ return FutureUtil.failedFuture(new
IllegalStateException(errorMsg));
}
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 4adc6aa1ce4..0a76446d3ce 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -18,9 +18,10 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.filter;
-import java.util.List;
+import java.util.Map;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
/**
* Filter out unqualified Brokers, which are not entered into LoadBalancer for
decision-making.
@@ -39,6 +40,7 @@ public interface BrokerFilter {
* @param context The load manager context.
* @return Filtered broker list.
*/
- List<String> filter(List<String> brokers, LoadManagerContext context)
throws BrokerFilterException;
+ Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData>
brokers, LoadManagerContext context)
+ throws BrokerFilterException;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
index 3b62777f1b6..e0a9122383c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
@@ -18,8 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance.extensions.strategy;
-import java.util.List;
import java.util.Optional;
+import java.util.Set;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -38,6 +38,6 @@ public interface BrokerSelectionStrategy {
* @param context
* The context contains information needed for selection
(load data, config, and etc).
*/
- Optional<String> select(List<String> brokers, ServiceUnitId bundle,
LoadManagerContext context);
+ Optional<String> select(Set<String> brokers, ServiceUnitId bundle,
LoadManagerContext context);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index f48bab54f89..678927dac92 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -20,7 +20,6 @@ package
org.apache.pulsar.broker.loadbalance.extensions.strategy;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
@@ -79,7 +78,8 @@ public class LeastResourceUsageWithWeight implements
BrokerSelectionStrategy {
* @return The name of the selected broker as it appears on ZooKeeper.
*/
@Override
- public Optional<String> select(List<String> candidates, ServiceUnitId
bundleToAssign, LoadManagerContext context) {
+ public Optional<String> select(
+ Set<String> candidates, ServiceUnitId bundleToAssign,
LoadManagerContext context) {
var conf = context.brokerConfiguration();
if (candidates.isEmpty()) {
log.info("There are no available brokers as candidates at this
point for bundle: {}", bundleToAssign);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
index d2349768352..846b528045a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.loadbalance.extensions.strategy;
\ No newline at end of file
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 84bce75bf5a..abbabcd3b00 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.resources.NamespaceResources;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -175,7 +176,14 @@ public class NamespaceService implements AutoCloseable {
long startTime = System.nanoTime();
CompletableFuture<Optional<LookupResult>> future =
getBundleAsync(topic)
- .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
+ .thenCompose(bundle -> {
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return
loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
+ } else {
+ // TODO: Add unit tests cover it.
+ return findBrokerServiceUrl(bundle, options);
+ }
+ });
future.thenAccept(optResult -> {
lookupLatency.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
@@ -232,16 +240,18 @@ public class NamespaceService implements AutoCloseable {
LOG.debug("Getting web service URL of topic: {} - options:
{}", name, options);
}
return getBundleAsync(name)
- .thenCompose(namespaceBundle ->
internalGetWebServiceUrl(namespaceBundle, options));
+ .thenCompose(namespaceBundle ->
+ internalGetWebServiceUrl(Optional.of(name),
namespaceBundle, options));
}
if (suName instanceof NamespaceName) {
return getFullBundleAsync((NamespaceName) suName)
- .thenCompose(namespaceBundle ->
internalGetWebServiceUrl(namespaceBundle, options));
+ .thenCompose(namespaceBundle ->
+ internalGetWebServiceUrl(Optional.empty(),
namespaceBundle, options));
}
if (suName instanceof NamespaceBundle) {
- return internalGetWebServiceUrl((NamespaceBundle) suName, options);
+ return internalGetWebServiceUrl(Optional.empty(),
(NamespaceBundle) suName, options);
}
throw new IllegalArgumentException("Unrecognized class of
NamespaceBundle: " + suName.getClass().getName());
@@ -257,9 +267,14 @@ public class NamespaceService implements AutoCloseable {
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
}
- private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
+ private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(Optional<ServiceUnitId> topic,
+
NamespaceBundle bundle,
+
LookupOptions options) {
- return findBrokerServiceUrl(bundle, options).thenApply(lookupResult ->
{
+ return (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+ ? loadManager.get().findBrokerServiceUrl(topic, bundle) :
+ // TODO: Add unit tests cover it.
+ findBrokerServiceUrl(bundle, options)).thenApply(lookupResult
-> {
if (lookupResult.isPresent()) {
try {
LookupData lookupData = lookupResult.get().getLookupData();
@@ -1024,6 +1039,10 @@ public class NamespaceService implements AutoCloseable {
}
if (suName instanceof NamespaceBundle) {
+ if
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return loadManager.get().checkOwnershipAsync(Optional.empty(),
suName);
+ }
+ // TODO: Add unit tests cover it.
return CompletableFuture.completedFuture(
ownershipCache.isNamespaceBundleOwned((NamespaceBundle)
suName));
}
@@ -1046,6 +1065,11 @@ public class NamespaceService implements AutoCloseable {
}
public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName
topicName) {
+ // TODO: Add unit tests cover it.
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return getBundleAsync(topicName)
+ .thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
+ }
Optional<CompletableFuture<OwnedBundle>> res =
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
if (!res.isPresent()) {
return CompletableFuture.completedFuture(false);
@@ -1059,15 +1083,30 @@ public class NamespaceService implements AutoCloseable {
}
private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName
fqnn) {
+ // TODO: Add unit tests cover it.
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return getFullBundleAsync(fqnn)
+ .thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
+ }
return getFullBundleAsync(fqnn)
.thenApply(bundle -> ownershipCache.getOwnedBundle(bundle) !=
null);
}
private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
+ // TODO: Add unit tests cover it.
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return getBundleAsync(topic)
+ .thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
+ }
return getBundleAsync(topic).thenApply(bundle ->
ownershipCache.isNamespaceBundleOwned(bundle));
}
public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName)
{
+ // TODO: Add unit tests cover it.
+ if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+ return getBundleAsync(topicName)
+ .thenCompose(bundle ->
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
+ }
return getBundleAsync(topicName)
.thenCompose(ownershipCache::checkOwnershipAsync);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
new file mode 100644
index 00000000000..d650567be8b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LeaderBroker;
+import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link ExtensibleLoadManagerImpl}.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest
{
+
+ private PulsarService pulsar1;
+ private PulsarService pulsar2;
+
+ private PulsarTestContext additionalPulsarTestContext;
+
+ private PulsarResources resources;
+
+ private ExtensibleLoadManagerImpl primaryLoadManager;
+
+ private ExtensibleLoadManagerImpl secondaryLoadManager;
+
+ private ServiceUnitStateChannelImpl channel1;
+ private ServiceUnitStateChannelImpl channel2;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ conf.setAllowAutoTopicCreation(true);
+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ super.internalSetup(conf);
+ pulsar1 = pulsar;
+ ServiceConfiguration defaultConf = getDefaultConf();
+
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+ additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
+ pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ ExtensibleLoadManagerWrapper primaryLoadManagerWrapper =
+ (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+ primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(primaryLoadManagerWrapper, "loadManager",
true));
+ FieldUtils.writeField(primaryLoadManagerWrapper, "loadManager",
primaryLoadManager, true);
+
+ ExtensibleLoadManagerWrapper secondaryLoadManagerWrapper =
+ (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+ secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+ FieldUtils.readField(secondaryLoadManagerWrapper,
"loadManager", true));
+ FieldUtils.writeField(secondaryLoadManagerWrapper, "loadManager",
secondaryLoadManager, true);
+
+ channel1 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(primaryLoadManager,
"serviceUnitStateChannel", true);
+ channel2 = (ServiceUnitStateChannelImpl)
+ FieldUtils.readField(secondaryLoadManager,
"serviceUnitStateChannel", true);
+
+ }
+
+ protected void beforePulsarStart(PulsarService pulsar) throws Exception {
+ if (resources == null) {
+ MetadataStoreExtended localStore =
pulsar.createLocalMetadataStore(null);
+ MetadataStoreExtended configStore = (MetadataStoreExtended)
pulsar.createConfigurationMetadataStore(null);
+ resources = new PulsarResources(localStore, configStore);
+ }
+ this.createNamespaceIfNotExists(resources,
NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ NamespaceName.SYSTEM_NAMESPACE);
+ }
+
+ protected void createNamespaceIfNotExists(PulsarResources resources,
+ String publicTenant,
+ NamespaceName ns) throws
Exception {
+ TenantResources tr = resources.getTenantResources();
+ NamespaceResources nsr = resources.getNamespaceResources();
+
+ if (!tr.tenantExists(publicTenant)) {
+ tr.createTenant(publicTenant,
+ TenantInfo.builder()
+
.adminRoles(Sets.newHashSet(conf.getSuperUserRoles()))
+
.allowedClusters(Sets.newHashSet(conf.getClusterName()))
+ .build());
+ }
+
+ if (!nsr.namespaceExists(ns)) {
+ Policies nsp = new Policies();
+ nsp.replication_clusters =
Collections.singleton(conf.getClusterName());
+ nsr.createPolicies(ns, nsp);
+ }
+ }
+
+ @Override
+ protected void cleanup() throws Exception {
+ pulsar1 = null;
+ pulsar2.close();
+ super.internalCleanup();
+ this.additionalPulsarTestContext.close();
+ }
+
+ @BeforeMethod
+ protected void initializeState() throws IllegalAccessException {
+ reset(primaryLoadManager, secondaryLoadManager);
+ cleanTableView(channel1);
+ cleanTableView(channel2);
+ }
+
+ @Test
+ public void testAssignInternalTopic() throws Exception {
+ Optional<BrokerLookupData> brokerLookupData1 =
primaryLoadManager.assign(
+ Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
+ getBundleAsync(pulsar1,
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
+ Optional<BrokerLookupData> brokerLookupData2 =
secondaryLoadManager.assign(
+ Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
+ getBundleAsync(pulsar1,
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
+ assertEquals(brokerLookupData1, brokerLookupData2);
+ assertTrue(brokerLookupData1.isPresent());
+
+ LeaderElectionService leaderElectionService = (LeaderElectionService)
+ FieldUtils.readField(channel1, "leaderElectionService", true);
+ Optional<LeaderBroker> currentLeader =
leaderElectionService.getCurrentLeader();
+ assertTrue(currentLeader.isPresent());
+ assertEquals(brokerLookupData1.get().getWebServiceUrl(),
currentLeader.get().getServiceUrl());
+ }
+
+ @Test
+ public void testAssign() throws Exception {
+ TopicName topicName = TopicName.get("test-assign");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+ Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
+ assertTrue(brokerLookupData.isPresent());
+ log.info("Assign the bundle {} to {}", bundle, brokerLookupData);
+ // Should get owner info from channel.
+ Optional<BrokerLookupData> brokerLookupData1 =
secondaryLoadManager.assign(Optional.empty(), bundle).get();
+ assertEquals(brokerLookupData, brokerLookupData1);
+
+ verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy();
+ verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy();
+
+ Optional<LookupResult> lookupResult = pulsar2.getNamespaceService()
+ .getBrokerServiceUrlAsync(topicName, null).get();
+ assertTrue(lookupResult.isPresent());
+ assertEquals(lookupResult.get().getLookupData().getHttpUrl(),
brokerLookupData.get().getWebServiceUrl());
+
+ Optional<URL> webServiceUrl = pulsar2.getNamespaceService()
+ .getWebServiceUrl(bundle,
LookupOptions.builder().requestHttps(false).build());
+ assertTrue(webServiceUrl.isPresent());
+ assertEquals(webServiceUrl.get().toString(),
brokerLookupData.get().getWebServiceUrl());
+ }
+
+ @Test
+ public void testCheckOwnershipAsync() throws Exception {
+ NamespaceBundle bundle = getBundleAsync(pulsar1,
TopicName.get("test-check-ownership")).get();
+ // 1. The bundle is never assigned.
+ assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
+ // 2. Assign the bundle to a broker.
+ Optional<BrokerLookupData> lookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
+ assertTrue(lookupData.isPresent());
+ if
(lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) {
+
assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ } else {
+
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+
assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(),
bundle).get());
+ }
+
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+ TopicName topicName = TopicName.get("test-filter");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+ doReturn(List.of(new BrokerFilter() {
+ @Override
+ public String name() {
+ return "Mock broker filter";
+ }
+
+ @Override
+ public Map<String, BrokerLookupData> filter(Map<String,
BrokerLookupData> brokers,
+ LoadManagerContext
context) {
+ brokers.remove(pulsar1.getLookupServiceAddress());
+ return brokers;
+ }
+ })).when(primaryLoadManager).getBrokerFilterPipeline();
+
+ Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
+ assertTrue(brokerLookupData.isPresent());
+ assertEquals(brokerLookupData.get().getWebServiceUrl(),
pulsar2.getWebServiceAddress());
+ }
+
+ @Test
+ public void testFilterHasException() throws Exception {
+ TopicName topicName = TopicName.get("test-filter-has-exception");
+ NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+ doReturn(List.of(new BrokerFilter() {
+ @Override
+ public String name() {
+ return "Mock broker filter";
+ }
+
+ @Override
+ public Map<String, BrokerLookupData> filter(Map<String,
BrokerLookupData> brokers,
+ LoadManagerContext
context) throws BrokerFilterException {
+ brokers.clear();
+ throw new BrokerFilterException("Test");
+ }
+ })).when(primaryLoadManager).getBrokerFilterPipeline();
+
+ Optional<BrokerLookupData> brokerLookupData =
primaryLoadManager.assign(Optional.empty(), bundle).get();
+ assertTrue(brokerLookupData.isPresent());
+ }
+
+ private static void cleanTableView(ServiceUnitStateChannel channel)
+ throws IllegalAccessException {
+ var tv = (TableViewImpl<ServiceUnitStateData>)
+ FieldUtils.readField(channel, "tableview", true);
+ var cache = (ConcurrentMap<String, ServiceUnitStateData>)
+ FieldUtils.readField(tv, "data", true);
+ cache.clear();
+ }
+
+ private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService
pulsar, TopicName topic) {
+ return pulsar.getNamespaceService().getBundleAsync(topic);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index bc85403b7cd..1c0a4f37633 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -160,7 +159,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
assertEquals(newChannelOwner1, newChannelOwner2);
assertNotEquals(channelOwner1, newChannelOwner1);
- if (newChannelOwner1.equals(lookupServiceAddress1)) {
+ if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) {
assertTrue(channel1.isChannelOwnerAsync().get(2,
TimeUnit.SECONDS));
assertFalse(channel2.isChannelOwnerAsync().get(2,
TimeUnit.SECONDS));
} else {
@@ -282,8 +281,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
- assertNull(owner1.get());
- assertNull(owner2.get());
+ assertTrue(owner1.get().isEmpty());
+ assertTrue(owner2.get().isEmpty());
var assigned1 = channel1.publishAssignEventAsync(bundle,
lookupServiceAddress1);
var assigned2 = channel2.publishAssignEventAsync(bundle,
lookupServiceAddress2);
@@ -329,15 +328,15 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
- assertNull(owner1.get());
- assertNull(owner2.get());
+ assertTrue(owner1.get().isEmpty());
+ assertTrue(owner2.get().isEmpty());
- owner1 = channel1.publishAssignEventAsync(bundle,
lookupServiceAddress1);
- owner2 = channel2.publishAssignEventAsync(bundle,
lookupServiceAddress2);
- assertTrue(owner1.isCompletedExceptionally());
- assertNotNull(owner2);
- String ownerAddr2 = owner2.get(5, TimeUnit.SECONDS);
- assertEquals(ownerAddr2, lookupServiceAddress2);
+ var owner3 = channel1.publishAssignEventAsync(bundle,
lookupServiceAddress1);
+ var owner4 = channel2.publishAssignEventAsync(bundle,
lookupServiceAddress2);
+ assertTrue(owner3.isCompletedExceptionally());
+ assertNotNull(owner4);
+ String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS);
+ assertEquals(ownerAddrOpt2, lookupServiceAddress2);
waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
assertEquals(0, getOwnerRequests1.size());
assertEquals(0, getOwnerRequests2.size());
@@ -352,8 +351,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
- assertNull(owner1.get());
- assertNull(owner2.get());
+ assertTrue(owner1.get().isEmpty());
+ assertTrue(owner2.get().isEmpty());
channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
@@ -363,7 +362,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, lookupServiceAddress1);
+ assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
Unload unload = new Unload(lookupServiceAddress1, bundle,
Optional.of(lookupServiceAddress2));
channel1.publishUnloadEventAsync(unload);
@@ -374,7 +373,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, lookupServiceAddress2);
+ assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
}
@Test(priority = 5)
@@ -393,7 +392,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
assertEquals(ownerAddr1, ownerAddr2);
- assertEquals(ownerAddr1, lookupServiceAddress1);
+ assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
var producer = (Producer<ServiceUnitStateData>)
FieldUtils.readDeclaredField(channel1,
"producer", true);
@@ -454,10 +453,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
- assertEquals(ownerAddr1, lookupServiceAddress1);
- assertEquals(ownerAddr2, lookupServiceAddress1);
+ assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+ assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+ assertTrue(ownerAddr1.isPresent());
- Split split = new Split(bundle, ownerAddr1, new HashMap<>());
+ Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>());
channel1.publishSplitEventAsync(split);
waitUntilNewOwner(channel1, bundle, null);
@@ -545,8 +545,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
var owner1 = channel1.getOwnerAsync(bundle1);
var owner2 = channel2.getOwnerAsync(bundle2);
- assertNull(owner1.get());
- assertNull(owner2.get());
+ assertTrue(owner1.get().isEmpty());
+ assertTrue(owner2.get().isEmpty());
String broker = lookupServiceAddress1;
channel1.publishAssignEventAsync(bundle1, broker);
@@ -701,8 +701,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
producer.newMessage().key(bundle).send();
var owner1 = channel1.getOwnerAsync(bundle);
var owner2 = channel2.getOwnerAsync(bundle);
- assertNull(owner1.get());
- assertNull(owner2.get());
+ assertTrue(owner1.get().isEmpty());
+ assertTrue(owner2.get().isEmpty());
var assigned1 = channel1.publishAssignEventAsync(bundle,
lookupServiceAddress1);
assertNotNull(assigned1);
@@ -724,8 +724,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
assertNotNull(ex);
assertEquals(TimeoutException.class, ex.getCause().getClass());
- assertEquals(lookupServiceAddress1,
channel2.getOwnerAsync(bundle).get());
- assertEquals(lookupServiceAddress1,
channel1.getOwnerAsync(bundle).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel2.getOwnerAsync(bundle).get());
+ assertEquals(Optional.of(lookupServiceAddress1),
channel1.getOwnerAsync(bundle).get());
var compactor = spy (pulsar1.getStrategicCompactor());
Field strategicCompactorField =
FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
@@ -743,7 +743,7 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(
- channel3.getOwnerAsync(bundle).get(),
lookupServiceAddress1));
+ channel3.getOwnerAsync(bundle).get(),
Optional.of(lookupServiceAddress1)));
channel3.close();
FieldUtils.writeDeclaredField(channel2,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -787,10 +787,8 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
if (!owner.isDone()) {
return false;
}
- if (oldOwner == null) {
- return owner != null;
- }
- return !oldOwner.equals(owner);
+
+ return !StringUtils.equals(oldOwner,
owner.get().orElse(null));
});
}
@@ -799,11 +797,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() -> { // wait until true
- CompletableFuture<String> owner =
channel.getOwnerAsync(serviceUnit);
+ CompletableFuture<Optional<String>> owner =
channel.getOwnerAsync(serviceUnit);
if (!owner.isDone()) {
return false;
}
- return !StringUtils.equals(oldOwner, owner.get());
+ return !StringUtils.equals(oldOwner,
owner.get().orElse(null));
});
}
@@ -813,11 +811,11 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
.atMost(15, TimeUnit.SECONDS)
.until(() -> { // wait until true
try {
- CompletableFuture<String> owner =
channel.getOwnerAsync(serviceUnit);
+ CompletableFuture<Optional<String>> owner =
channel.getOwnerAsync(serviceUnit);
if (!owner.isDone()) {
return false;
}
- return StringUtils.equals(newOwner, owner.get());
+ return StringUtils.equals(newOwner,
owner.get().orElse(null));
} catch (Exception e) {
return false;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index ef0e65762f1..2856dde892a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -24,9 +24,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -78,7 +77,7 @@ public class LeastResourceUsageWithWeightTest {
LeastResourceUsageWithWeight strategy = new
LeastResourceUsageWithWeight();
// Should choice broker from broker1 2 3.
- List<String> candidates = new ArrayList<>();
+ Set<String> candidates = new HashSet<>();
candidates.add("1");
candidates.add("2");
candidates.add("3");
@@ -125,7 +124,7 @@ public class LeastResourceUsageWithWeightTest {
LeastResourceUsageWithWeight strategy = new
LeastResourceUsageWithWeight();
// Should choice broker from broker1 2 3.
- List<String> candidates = new ArrayList<>();
+ Set<String> candidates = new HashSet<>();
candidates.add("1");
candidates.add("2");
candidates.add("3");
@@ -141,7 +140,7 @@ public class LeastResourceUsageWithWeightTest {
LeastResourceUsageWithWeight strategy = new
LeastResourceUsageWithWeight();
- List<String> candidates = new ArrayList<>();
+ Set<String> candidates = new HashSet<>();
var brokerLoadDataStore = ctx.brokerLoadDataStore();
brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,50, 100));
brokerLoadDataStore.pushAsync("2", createBrokerData(ctx,100, 100));
@@ -189,7 +188,7 @@ public class LeastResourceUsageWithWeightTest {
1, 1, 1, 1, ctx.brokerConfiguration());
}
- public LoadManagerContext getContext() {
+ public static LoadManagerContext getContext() {
var ctx = mock(LoadManagerContext.class);
var conf = new ServiceConfiguration();
conf.setLoadBalancerCPUResourceWeight(1.0);