This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c3126c684a9 [improve][broker] PIP-192: Implement extensible load 
manager (#19102)
c3126c684a9 is described below

commit c3126c684a91f2c677fbe2eb0e35037d77c4b116
Author: Kai Wang <[email protected]>
AuthorDate: Fri Feb 3 15:05:06 2023 +0800

    [improve][broker] PIP-192: Implement extensible load manager (#19102)
    
    PIP: #16691
    
    ### Motivation
    
    Implement extensible load manager.
    
    ### Modifications
    
    For the PIP-192, this PR adds `ExtensibleLoadManagerImpl` and unit tests.
    
    This PR also changes:
    1. Added `CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
                Optional<ServiceUnitId> topic, ServiceUnitId bundle)` to 
`LoadManager ` interface.
    2. Added `CompletableFuture<Boolean> 
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle)` to 
`LoadManager` interface.
    3. Change `CompletableFuture<String> getOwnerAsync(String serviceUnit)` to 
`CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit)` to 
unify the result.
---
 .../org/apache/pulsar/broker/PulsarService.java    |  26 +-
 .../pulsar/broker/loadbalance/LoadManager.java     |  18 ++
 .../extensions/ExtensibleLoadManager.java          |   9 +
 .../extensions/ExtensibleLoadManagerImpl.java      | 266 +++++++++++++++++++
 .../extensions/ExtensibleLoadManagerWrapper.java   | 146 +++++++++++
 .../extensions/LoadManagerContextImpl.java         |  60 +++++
 .../channel/ServiceUnitStateChannel.java           |   4 +-
 .../channel/ServiceUnitStateChannelImpl.java       |  13 +-
 .../extensions/filter/BrokerFilter.java            |   6 +-
 .../strategy/BrokerSelectionStrategy.java          |   4 +-
 .../strategy/LeastResourceUsageWithWeight.java     |   4 +-
 .../extensions/strategy/package-info.java          |   2 +-
 .../pulsar/broker/namespace/NamespaceService.java  |  51 +++-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 287 +++++++++++++++++++++
 .../channel/ServiceUnitStateChannelTest.java       |  68 +++--
 .../strategy/LeastResourceUsageWithWeightTest.java |  11 +-
 16 files changed, 905 insertions(+), 70 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0164b51d2c2..ae0fb1a9f28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.v1.TopicLookup;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.protocol.ProtocolHandlers;
@@ -818,6 +819,17 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 this.webSocketService.setLocalCluster(clusterData);
             }
 
+            // Start the leader election service
+            startLeaderElectionService();
+
+            // By starting the Load manager service, the broker will also 
become visible
+            // to the rest of the broker by creating the registration z-node. 
This needs
+            // to be done only when the broker is fully operative.
+            //
+            // The load manager service and its service unit state channel 
need to be initialized first
+            // (namespace service depends on load manager)
+            this.startLoadManagementService();
+
             // Initialize namespace service, after service url assigned. 
Should init zk and refresh self owner info.
             this.nsService.initialize();
 
@@ -828,9 +840,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             this.topicPoliciesService.start();
 
-            // Start the leader election service
-            startLeaderElectionService();
-
             // Register heartbeat and bootstrap namespaces.
             this.nsService.registerBootstrapNamespaces();
 
@@ -859,11 +868,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
             this.metricsGenerator = new MetricsGenerator(this);
 
-            // By starting the Load manager service, the broker will also 
become visible
-            // to the rest of the broker by creating the registration z-node. 
This needs
-            // to be done only when the broker is fully operative.
-            this.startLoadManagementService();
-
             // Initialize the message protocol handlers.
             // start the protocol handlers only after the broker is ready,
             // so that the protocol handlers can access broker service 
properly.
@@ -1103,6 +1107,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     }
 
     protected void startLeaderElectionService() {
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            LOG.info("The load manager extension is enabled. Skipping 
PulsarService LeaderElectionService.");
+            return;
+        }
         this.leaderElectionService = new 
LeaderElectionService(coordinationService, getSafeWebServiceAddress(),
                 state -> {
                     if (state == LeaderElectionState.Leading) {
@@ -1207,7 +1215,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         LOG.info("Starting load management service ...");
         this.loadManager.get().start();
 
-        if (config.isLoadBalancerEnabled()) {
+        if (config.isLoadBalancerEnabled() && 
!ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
             LOG.info("Starting load balancer");
             if (this.loadReportTask == null) {
                 long loadReportMinInterval = 
config.getLoadBalancerReportUpdateMinIntervalMillis();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index b4df5d31968..17bff57b85c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -25,8 +25,12 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManager;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.Reflections;
@@ -58,6 +62,15 @@ public interface LoadManager {
      */
     Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;
 
+    default CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        throw new UnsupportedOperationException();
+    }
+
+    default CompletableFuture<Boolean> 
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Generate the load report.
      */
@@ -145,6 +158,11 @@ public interface LoadManager {
                 final LoadManager casted = new 
ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
                 casted.initialize(pulsar);
                 return casted;
+            } else if (loadManagerInstance instanceof ExtensibleLoadManager) {
+                final LoadManager casted =
+                        new 
ExtensibleLoadManagerWrapper((ExtensibleLoadManagerImpl) loadManagerInstance);
+                casted.initialize(pulsar);
+                return casted;
             }
         } catch (Exception e) {
             LOG.warn("Error when trying to create load manager: ", e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
index bb66bf731f4..b7da70d1cf1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManager.java
@@ -64,6 +64,15 @@ public interface ExtensibleLoadManager extends Closeable {
      */
     CompletableFuture<Optional<BrokerLookupData>> 
assign(Optional<ServiceUnitId> topic, ServiceUnitId serviceUnit);
 
+    /**
+     * Check the incoming service unit is owned by the current broker.
+     *
+     * @param topic The optional topic, some method won't provide topic var in 
this param.
+     * @param serviceUnit The service unit (e.g. bundle).
+     * @return The broker lookup data.
+     */
+    CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> 
topic, ServiceUnitId serviceUnit);
+
     /**
      * Close the load manager.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
new file mode 100644
index 00000000000..d95bacd157e
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
+import 
org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
+import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
+import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+
+@Slf4j
+public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
+
+    public static final String BROKER_LOAD_DATA_STORE_TOPIC = TopicName.get(
+            TopicDomain.non_persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE,
+            "loadbalancer-broker-load-data").toString();
+
+    public static final String TOP_BUNDLES_LOAD_DATA_STORE_TOPIC = 
TopicName.get(
+            TopicDomain.non_persistent.value(),
+            NamespaceName.SYSTEM_NAMESPACE,
+            "loadbalancer-top-bundles-load-data").toString();
+
+    private PulsarService pulsar;
+
+    private ServiceConfiguration conf;
+
+    @Getter
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceUnitStateChannel serviceUnitStateChannel;
+
+    private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
+    private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
+
+    @Getter
+    private LoadManagerContext context;
+
+    @Getter
+    private final BrokerSelectionStrategy brokerSelectionStrategy;
+
+    @Getter
+    private final List<BrokerFilter> brokerFilterPipeline;
+
+    private boolean started = false;
+
+    private final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<BrokerLookupData>>>
+            lookupRequests = ConcurrentOpenHashMap.<String,
+                    CompletableFuture<Optional<BrokerLookupData>>>newBuilder()
+            .build();
+
+    /**
+     * Life cycle: Constructor -> initialize -> start -> close.
+     */
+    public ExtensibleLoadManagerImpl() {
+        this.brokerFilterPipeline = new ArrayList<>();
+        // TODO: Make brokerSelectionStrategy configurable.
+        this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
+    }
+
+    public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration 
conf) {
+        return 
ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        if (this.started) {
+            return;
+        }
+        this.brokerRegistry = new BrokerRegistryImpl(pulsar);
+        this.serviceUnitStateChannel = new ServiceUnitStateChannelImpl(pulsar);
+        this.brokerRegistry.start();
+        this.serviceUnitStateChannel.start();
+
+        try {
+            this.brokerLoadDataStore = LoadDataStoreFactory
+                    .create(pulsar.getClient(), BROKER_LOAD_DATA_STORE_TOPIC, 
BrokerLoadData.class);
+            this.topBundlesLoadDataStore = LoadDataStoreFactory
+                    .create(pulsar.getClient(), 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class);
+        } catch (LoadDataStoreException e) {
+            throw new PulsarServerException(e);
+        }
+
+        this.context = LoadManagerContextImpl.builder()
+                .configuration(conf)
+                .brokerRegistry(brokerRegistry)
+                .brokerLoadDataStore(brokerLoadDataStore)
+                .topBundleLoadDataStore(topBundlesLoadDataStore).build();
+        // TODO: Start load data reporter.
+
+        // TODO: Start unload scheduler and bundle split scheduler
+        this.started = true;
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.conf = pulsar.getConfiguration();
+    }
+
+    @Override
+    public CompletableFuture<Optional<BrokerLookupData>> 
assign(Optional<ServiceUnitId> topic,
+                                                                ServiceUnitId 
serviceUnit) {
+
+        final String bundle = serviceUnit.toString();
+
+        CompletableFuture<Optional<BrokerLookupData>> future = 
lookupRequests.computeIfAbsent(bundle, k -> {
+            final CompletableFuture<Optional<String>> owner;
+            // Assign the bundle to channel owner if is internal topic, to 
avoid circular references.
+            if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+                owner = serviceUnitStateChannel.getChannelOwnerAsync();
+            } else {
+                owner = 
serviceUnitStateChannel.getOwnerAsync(bundle).thenCompose(broker -> {
+                    // If the bundle not assign yet, select and publish assign 
event to channel.
+                    if (broker.isEmpty()) {
+                        return 
this.selectAsync(serviceUnit).thenCompose(brokerOpt -> {
+                            if (brokerOpt.isPresent()) {
+                                log.info("Selected new owner broker: {} for 
bundle: {}.", brokerOpt.get(), bundle);
+                                return 
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
+                                        .thenApply(Optional::of);
+                            } else {
+                                throw new IllegalStateException(
+                                        "Failed to select the new owner broker 
for bundle: " + bundle);
+                            }
+                        });
+                    }
+                    // Already assigned, return it.
+                    return CompletableFuture.completedFuture(broker);
+                });
+            }
+
+            return owner.thenCompose(broker -> {
+                if (broker.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for 
bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(broker.get());
+            }).thenCompose(broker -> 
this.getBrokerRegistry().lookupAsync(broker).thenCompose(brokerLookupData -> {
+                if (brokerLookupData.isEmpty()) {
+                    String errorMsg = String.format(
+                            "Failed to look up a broker registry:%s for 
bundle:%s", broker, bundle);
+                    log.error(errorMsg);
+                    throw new IllegalStateException(errorMsg);
+                }
+                return CompletableFuture.completedFuture(brokerLookupData);
+            }));
+        });
+        future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+        return future;
+    }
+
+    private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle) {
+        BrokerRegistry brokerRegistry = getBrokerRegistry();
+        return brokerRegistry.getAvailableBrokerLookupDataAsync()
+                .thenCompose(availableBrokers -> {
+                    // TODO: Support isolation policies
+                    LoadManagerContext context = this.getContext();
+
+                    Map<String, BrokerLookupData> availableBrokerCandidates = 
new HashMap<>(availableBrokers);
+
+                    // Filter out brokers that do not meet the rules.
+                    List<BrokerFilter> filterPipeline = 
getBrokerFilterPipeline();
+                    for (final BrokerFilter filter : filterPipeline) {
+                        try {
+                            filter.filter(availableBrokerCandidates, context);
+                        } catch (BrokerFilterException e) {
+                            // TODO: We may need to revisit this error case.
+                            log.error("Failed to filter out brokers.", e);
+                            availableBrokerCandidates = availableBrokers;
+                        }
+                    }
+                    if (availableBrokerCandidates.isEmpty()) {
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    }
+                    Set<String> candidateBrokers = 
availableBrokerCandidates.keySet();
+
+                    return CompletableFuture.completedFuture(
+                            
getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
+                });
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundleUnit) {
+        final String bundle = bundleUnit.toString();
+        CompletableFuture<Optional<String>> owner;
+        if (topic.isPresent() && isInternalTopic(topic.get().toString())) {
+            owner = serviceUnitStateChannel.getChannelOwnerAsync();
+        } else {
+            owner = serviceUnitStateChannel.getOwnerAsync(bundle);
+        }
+
+        return owner.thenApply(broker -> 
brokerRegistry.getBrokerId().equals(broker.orElse(null)));
+    }
+
+    @Override
+    public void close() throws PulsarServerException {
+        if (!this.started) {
+            return;
+        }
+        try {
+            this.brokerLoadDataStore.close();
+            this.topBundlesLoadDataStore.close();
+        } catch (IOException ex) {
+            throw new PulsarServerException(ex);
+        } finally {
+            try {
+                this.brokerRegistry.close();
+            } finally {
+                try {
+                    this.serviceUnitStateChannel.close();
+                } finally {
+                    this.started = false;
+                }
+            }
+        }
+    }
+
+    private boolean isInternalTopic(String topic) {
+        return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
+                || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
+                || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
new file mode 100644
index 00000000000..6d5797eed66
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
+
+public class ExtensibleLoadManagerWrapper implements LoadManager {
+
+    private PulsarService pulsar;
+
+    private final ExtensibleLoadManagerImpl loadManager;
+
+    public ExtensibleLoadManagerWrapper(ExtensibleLoadManagerImpl loadManager) 
{
+        this.loadManager = loadManager;
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        loadManager.start();
+    }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        loadManager.initialize(pulsar);
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public boolean isCentralized() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(
+            Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.assign(topic, bundle)
+                .thenApply(lookupData -> 
lookupData.map(BrokerLookupData::toLookupResult));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle) {
+        return loadManager.checkOwnershipAsync(topic, bundle);
+    }
+
+    @Override
+    public void disableBroker() throws Exception {
+        this.loadManager.getBrokerRegistry().unregister();
+    }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return getAvailableBrokersAsync()
+                
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
+        return 
this.loadManager.getBrokerRegistry().getAvailableBrokersAsync().thenApply(HashSet::new);
+    }
+
+    @Override
+    public String setNamespaceBundleAffinity(String bundle, String broker) {
+        // TODO: Add namespace bundle affinity support.
+        return null;
+    }
+
+    @Override
+    public void stop() throws PulsarServerException {
+        this.loadManager.close();
+    }
+
+
+    @Override
+    public Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws 
Exception {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public LoadManagerReport generateLoadReport() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setLoadReportForceUpdateFlag() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeLoadReportOnZookeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will 
automatically write.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeResourceQuotasToZooKeeper() throws Exception {
+        // No-op, this operation is not useful, the load data reporter will 
automatically write.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Metrics> getLoadBalancingMetrics() {
+        // TODO: Add metrics.
+        return null;
+    }
+
+    @Override
+    public void doLoadShedding() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void doNamespaceBundleSplit() {
+        throw new UnsupportedOperationException();
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java
new file mode 100644
index 00000000000..5f78b88e22c
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerContextImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import lombok.Builder;
+import lombok.Setter;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
+
+@Setter
+@Builder
+public class LoadManagerContextImpl implements LoadManagerContext {
+
+    private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
+
+    private LoadDataStore<TopBundlesLoadData> topBundleLoadDataStore;
+
+    private BrokerRegistry brokerRegistry;
+
+    private ServiceConfiguration configuration;
+
+
+    @Override
+    public ServiceConfiguration brokerConfiguration() {
+        return this.configuration;
+    }
+
+    @Override
+    public LoadDataStore<BrokerLoadData> brokerLoadDataStore() {
+        return this.brokerLoadDataStore;
+    }
+
+    @Override
+    public LoadDataStore<TopBundlesLoadData> topBundleLoadDataStore() {
+        return this.topBundleLoadDataStore;
+    }
+
+    @Override
+    public BrokerRegistry brokerRegistry() {
+        return this.brokerRegistry;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
index 238c433e177..fece425e75f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
@@ -116,9 +116,9 @@ public interface ServiceUnitStateChannel extends Closeable {
      *                 the future object will complete and return the owner 
broker.
      *      Sub-case2: If the assigned broker does not take the ownership in 
time,
      *                 the future object will time out.
-     * Case 3: If none of them, it returns null.
+     * Case 3: If none of them, it returns Optional.empty().
      */
-    CompletableFuture<String> getOwnerAsync(String serviceUnit);
+    CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit);
 
     /**
      * Asynchronously publishes the service unit assignment event to the 
system topic in this channel.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 38e8afa50f3..37dfe6090bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -63,6 +63,7 @@ import 
org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
@@ -276,21 +277,23 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
-    public CompletableFuture<String> getOwnerAsync(String serviceUnit) {
+    public CompletableFuture<Optional<String>> getOwnerAsync(String 
serviceUnit) {
         validateChannelState(Started, true);
         ServiceUnitStateData data = tableview.get(serviceUnit);
         if (data == null) {
-            return CompletableFuture.completedFuture(null);
+            return CompletableFuture.completedFuture(Optional.empty());
         }
         switch (data.state()) {
             case Owned, Splitting -> {
-                return CompletableFuture.completedFuture(data.broker());
+                return 
CompletableFuture.completedFuture(Optional.of(data.broker()));
             }
             case Assigned, Released -> {
-                return deferGetOwnerRequest(serviceUnit);
+                return 
deferGetOwnerRequest(serviceUnit).thenApply(Optional::of);
             }
             default -> {
-                return null;
+                String errorMsg = String.format("Failed to process service 
unit state data: %s when get owner.", data);
+                log.error(errorMsg);
+                return FutureUtil.failedFuture(new 
IllegalStateException(errorMsg));
             }
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 4adc6aa1ce4..0a76446d3ce 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.filter;
 
-import java.util.List;
+import java.util.Map;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 
 /**
  * Filter out unqualified Brokers, which are not entered into LoadBalancer for 
decision-making.
@@ -39,6 +40,7 @@ public interface BrokerFilter {
      * @param context The load manager context.
      * @return Filtered broker list.
      */
-    List<String> filter(List<String> brokers, LoadManagerContext context) 
throws BrokerFilterException;
+    Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers, LoadManagerContext context)
+            throws BrokerFilterException;
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
index 3b62777f1b6..e0a9122383c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/BrokerSelectionStrategy.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.strategy;
 
-import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.common.naming.ServiceUnitId;
 
@@ -38,6 +38,6 @@ public interface BrokerSelectionStrategy {
      * @param context
      *               The context contains information needed for selection 
(load data, config, and etc).
      */
-    Optional<String> select(List<String> brokers, ServiceUnitId bundle, 
LoadManagerContext context);
+    Optional<String> select(Set<String> brokers, ServiceUnitId bundle, 
LoadManagerContext context);
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index f48bab54f89..678927dac92 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -20,7 +20,6 @@ package 
org.apache.pulsar.broker.loadbalance.extensions.strategy;
 
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
@@ -79,7 +78,8 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
      * @return The name of the selected broker as it appears on ZooKeeper.
      */
     @Override
-    public Optional<String> select(List<String> candidates, ServiceUnitId 
bundleToAssign, LoadManagerContext context) {
+    public Optional<String> select(
+            Set<String> candidates, ServiceUnitId bundleToAssign, 
LoadManagerContext context) {
         var conf = context.brokerConfiguration();
         if (candidates.isEmpty()) {
             log.info("There are no available brokers as candidates at this 
point for bundle: {}", bundleToAssign);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
index d2349768352..846b528045a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.loadbalance.extensions.strategy;
\ No newline at end of file
+package org.apache.pulsar.broker.loadbalance.extensions.strategy;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 84bce75bf5a..abbabcd3b00 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -175,7 +176,14 @@ public class NamespaceService implements AutoCloseable {
         long startTime = System.nanoTime();
 
         CompletableFuture<Optional<LookupResult>> future = 
getBundleAsync(topic)
-                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
+                .thenCompose(bundle -> {
+                    if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+                        return 
loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle);
+                    } else {
+                        // TODO: Add unit tests cover it.
+                        return findBrokerServiceUrl(bundle, options);
+                    }
+                });
 
         future.thenAccept(optResult -> {
             lookupLatency.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
@@ -232,16 +240,18 @@ public class NamespaceService implements AutoCloseable {
                 LOG.debug("Getting web service URL of topic: {} - options: 
{}", name, options);
             }
             return getBundleAsync(name)
-                    .thenCompose(namespaceBundle -> 
internalGetWebServiceUrl(namespaceBundle, options));
+                    .thenCompose(namespaceBundle ->
+                            internalGetWebServiceUrl(Optional.of(name), 
namespaceBundle, options));
         }
 
         if (suName instanceof NamespaceName) {
             return getFullBundleAsync((NamespaceName) suName)
-                    .thenCompose(namespaceBundle -> 
internalGetWebServiceUrl(namespaceBundle, options));
+                    .thenCompose(namespaceBundle ->
+                            internalGetWebServiceUrl(Optional.empty(), 
namespaceBundle, options));
         }
 
         if (suName instanceof NamespaceBundle) {
-            return internalGetWebServiceUrl((NamespaceBundle) suName, options);
+            return internalGetWebServiceUrl(Optional.empty(), 
(NamespaceBundle) suName, options);
         }
 
         throw new IllegalArgumentException("Unrecognized class of 
NamespaceBundle: " + suName.getClass().getName());
@@ -257,9 +267,14 @@ public class NamespaceService implements AutoCloseable {
                 
.get(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
     }
 
-    private CompletableFuture<Optional<URL>> 
internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
+    private CompletableFuture<Optional<URL>> 
internalGetWebServiceUrl(Optional<ServiceUnitId> topic,
+                                                                      
NamespaceBundle bundle,
+                                                                      
LookupOptions options) {
 
-        return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> 
{
+        return (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)
+                ? loadManager.get().findBrokerServiceUrl(topic, bundle) :
+                // TODO: Add unit tests cover it.
+                findBrokerServiceUrl(bundle, options)).thenApply(lookupResult 
-> {
             if (lookupResult.isPresent()) {
                 try {
                     LookupData lookupData = lookupResult.get().getLookupData();
@@ -1024,6 +1039,10 @@ public class NamespaceService implements AutoCloseable {
         }
 
         if (suName instanceof NamespaceBundle) {
+            if 
(ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+                return loadManager.get().checkOwnershipAsync(Optional.empty(), 
suName);
+            }
+            // TODO: Add unit tests cover it.
             return CompletableFuture.completedFuture(
                     ownershipCache.isNamespaceBundleOwned((NamespaceBundle) 
suName));
         }
@@ -1046,6 +1065,11 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Boolean> isServiceUnitActiveAsync(TopicName 
topicName) {
+        // TODO: Add unit tests cover it.
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            return getBundleAsync(topicName)
+                    .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
+        }
         Optional<CompletableFuture<OwnedBundle>> res = 
ownershipCache.getOwnedBundleAsync(getBundle(topicName));
         if (!res.isPresent()) {
             return CompletableFuture.completedFuture(false);
@@ -1059,15 +1083,30 @@ public class NamespaceService implements AutoCloseable {
     }
 
     private CompletableFuture<Boolean> isNamespaceOwnedAsync(NamespaceName 
fqnn) {
+        // TODO: Add unit tests cover it.
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            return getFullBundleAsync(fqnn)
+                    .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.empty(), bundle));
+        }
         return getFullBundleAsync(fqnn)
                 .thenApply(bundle -> ownershipCache.getOwnedBundle(bundle) != 
null);
     }
 
     private CompletableFuture<Boolean> isTopicOwnedAsync(TopicName topic) {
+        // TODO: Add unit tests cover it.
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            return getBundleAsync(topic)
+                    .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.of(topic), bundle));
+        }
         return getBundleAsync(topic).thenApply(bundle -> 
ownershipCache.isNamespaceBundleOwned(bundle));
     }
 
     public CompletableFuture<Boolean> checkTopicOwnership(TopicName topicName) 
{
+        // TODO: Add unit tests cover it.
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
+            return getBundleAsync(topicName)
+                    .thenCompose(bundle -> 
loadManager.get().checkOwnershipAsync(Optional.of(topicName), bundle));
+        }
         return getBundleAsync(topicName)
                 .thenCompose(ownershipCache::checkOwnershipAsync);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
new file mode 100644
index 00000000000..d650567be8b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.LeaderBroker;
+import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.namespace.LookupOptions;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.impl.TableViewImpl;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link ExtensibleLoadManagerImpl}.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest 
{
+
+    private PulsarService pulsar1;
+    private PulsarService pulsar2;
+
+    private PulsarTestContext additionalPulsarTestContext;
+
+    private PulsarResources resources;
+
+    private ExtensibleLoadManagerImpl primaryLoadManager;
+
+    private ExtensibleLoadManagerImpl secondaryLoadManager;
+
+    private ServiceUnitStateChannelImpl channel1;
+    private ServiceUnitStateChannelImpl channel2;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        conf.setAllowAutoTopicCreation(true);
+        
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        super.internalSetup(conf);
+        pulsar1 = pulsar;
+        ServiceConfiguration defaultConf = getDefaultConf();
+        
defaultConf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
+        additionalPulsarTestContext = 
createAdditionalPulsarTestContext(defaultConf);
+        pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+        ExtensibleLoadManagerWrapper primaryLoadManagerWrapper =
+                (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
+        primaryLoadManager = spy((ExtensibleLoadManagerImpl)
+                FieldUtils.readField(primaryLoadManagerWrapper, "loadManager", 
true));
+        FieldUtils.writeField(primaryLoadManagerWrapper, "loadManager", 
primaryLoadManager, true);
+
+        ExtensibleLoadManagerWrapper secondaryLoadManagerWrapper =
+                (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
+        secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
+                FieldUtils.readField(secondaryLoadManagerWrapper, 
"loadManager", true));
+        FieldUtils.writeField(secondaryLoadManagerWrapper, "loadManager", 
secondaryLoadManager, true);
+
+        channel1 = (ServiceUnitStateChannelImpl)
+                FieldUtils.readField(primaryLoadManager, 
"serviceUnitStateChannel", true);
+        channel2 = (ServiceUnitStateChannelImpl)
+                FieldUtils.readField(secondaryLoadManager, 
"serviceUnitStateChannel", true);
+
+    }
+
+    protected void beforePulsarStart(PulsarService pulsar) throws Exception {
+        if (resources == null) {
+            MetadataStoreExtended localStore = 
pulsar.createLocalMetadataStore(null);
+            MetadataStoreExtended configStore = (MetadataStoreExtended) 
pulsar.createConfigurationMetadataStore(null);
+            resources = new PulsarResources(localStore, configStore);
+        }
+        this.createNamespaceIfNotExists(resources, 
NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                NamespaceName.SYSTEM_NAMESPACE);
+    }
+
+    protected void createNamespaceIfNotExists(PulsarResources resources,
+                                              String publicTenant,
+                                              NamespaceName ns) throws 
Exception {
+        TenantResources tr = resources.getTenantResources();
+        NamespaceResources nsr = resources.getNamespaceResources();
+
+        if (!tr.tenantExists(publicTenant)) {
+            tr.createTenant(publicTenant,
+                    TenantInfo.builder()
+                            
.adminRoles(Sets.newHashSet(conf.getSuperUserRoles()))
+                            
.allowedClusters(Sets.newHashSet(conf.getClusterName()))
+                            .build());
+        }
+
+        if (!nsr.namespaceExists(ns)) {
+            Policies nsp = new Policies();
+            nsp.replication_clusters = 
Collections.singleton(conf.getClusterName());
+            nsr.createPolicies(ns, nsp);
+        }
+    }
+
+    @Override
+    protected void cleanup() throws Exception {
+        pulsar1 = null;
+        pulsar2.close();
+        super.internalCleanup();
+        this.additionalPulsarTestContext.close();
+    }
+
+    @BeforeMethod
+    protected void initializeState() throws IllegalAccessException {
+        reset(primaryLoadManager, secondaryLoadManager);
+        cleanTableView(channel1);
+        cleanTableView(channel2);
+    }
+
+    @Test
+    public void testAssignInternalTopic() throws Exception {
+        Optional<BrokerLookupData> brokerLookupData1 = 
primaryLoadManager.assign(
+                Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
+                getBundleAsync(pulsar1, 
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
+        Optional<BrokerLookupData> brokerLookupData2 = 
secondaryLoadManager.assign(
+                Optional.of(TopicName.get(ServiceUnitStateChannelImpl.TOPIC)),
+                getBundleAsync(pulsar1, 
TopicName.get(ServiceUnitStateChannelImpl.TOPIC)).get()).get();
+        assertEquals(brokerLookupData1, brokerLookupData2);
+        assertTrue(brokerLookupData1.isPresent());
+
+        LeaderElectionService leaderElectionService = (LeaderElectionService)
+                FieldUtils.readField(channel1, "leaderElectionService", true);
+        Optional<LeaderBroker> currentLeader = 
leaderElectionService.getCurrentLeader();
+        assertTrue(currentLeader.isPresent());
+        assertEquals(brokerLookupData1.get().getWebServiceUrl(), 
currentLeader.get().getServiceUrl());
+    }
+
+    @Test
+    public void testAssign() throws Exception {
+        TopicName topicName = TopicName.get("test-assign");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        assertTrue(brokerLookupData.isPresent());
+        log.info("Assign the bundle {} to {}", bundle, brokerLookupData);
+        // Should get owner info from channel.
+        Optional<BrokerLookupData> brokerLookupData1 = 
secondaryLoadManager.assign(Optional.empty(), bundle).get();
+        assertEquals(brokerLookupData, brokerLookupData1);
+
+        verify(primaryLoadManager, times(1)).getBrokerSelectionStrategy();
+        verify(secondaryLoadManager, times(0)).getBrokerSelectionStrategy();
+
+        Optional<LookupResult> lookupResult = pulsar2.getNamespaceService()
+                .getBrokerServiceUrlAsync(topicName, null).get();
+        assertTrue(lookupResult.isPresent());
+        assertEquals(lookupResult.get().getLookupData().getHttpUrl(), 
brokerLookupData.get().getWebServiceUrl());
+
+        Optional<URL> webServiceUrl = pulsar2.getNamespaceService()
+                .getWebServiceUrl(bundle, 
LookupOptions.builder().requestHttps(false).build());
+        assertTrue(webServiceUrl.isPresent());
+        assertEquals(webServiceUrl.get().toString(), 
brokerLookupData.get().getWebServiceUrl());
+    }
+
+    @Test
+    public void testCheckOwnershipAsync() throws Exception {
+        NamespaceBundle bundle = getBundleAsync(pulsar1, 
TopicName.get("test-check-ownership")).get();
+        // 1. The bundle is never assigned.
+        assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+        assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+
+        // 2. Assign the bundle to a broker.
+        Optional<BrokerLookupData> lookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        assertTrue(lookupData.isPresent());
+        if 
(lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) {
+            
assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            
assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+        } else {
+            
assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+            
assertTrue(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
+        }
+
+    }
+
+    @Test
+    public void testFilter() throws Exception {
+        TopicName topicName = TopicName.get("test-filter");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        doReturn(List.of(new BrokerFilter() {
+            @Override
+            public String name() {
+                return "Mock broker filter";
+            }
+
+            @Override
+            public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> brokers,
+                                                        LoadManagerContext 
context) {
+                brokers.remove(pulsar1.getLookupServiceAddress());
+                return brokers;
+            }
+        })).when(primaryLoadManager).getBrokerFilterPipeline();
+
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        assertTrue(brokerLookupData.isPresent());
+        assertEquals(brokerLookupData.get().getWebServiceUrl(), 
pulsar2.getWebServiceAddress());
+    }
+
+    @Test
+    public void testFilterHasException() throws Exception {
+        TopicName topicName = TopicName.get("test-filter-has-exception");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        doReturn(List.of(new BrokerFilter() {
+            @Override
+            public String name() {
+                return "Mock broker filter";
+            }
+
+            @Override
+            public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> brokers,
+                                                        LoadManagerContext 
context) throws BrokerFilterException {
+                brokers.clear();
+                throw new BrokerFilterException("Test");
+            }
+        })).when(primaryLoadManager).getBrokerFilterPipeline();
+
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        assertTrue(brokerLookupData.isPresent());
+    }
+
+    private static void cleanTableView(ServiceUnitStateChannel channel)
+            throws IllegalAccessException {
+        var tv = (TableViewImpl<ServiceUnitStateData>)
+                FieldUtils.readField(channel, "tableview", true);
+        var cache = (ConcurrentMap<String, ServiceUnitStateData>)
+                FieldUtils.readField(tv, "data", true);
+        cache.clear();
+    }
+
+    private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService 
pulsar, TopicName topic) {
+        return pulsar.getNamespaceService().getBundleAsync(topic);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index bc85403b7cd..1c0a4f37633 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -160,7 +159,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(newChannelOwner1, newChannelOwner2);
         assertNotEquals(channelOwner1, newChannelOwner1);
 
-        if (newChannelOwner1.equals(lookupServiceAddress1)) {
+        if (newChannelOwner1.equals(Optional.of(lookupServiceAddress1))) {
             assertTrue(channel1.isChannelOwnerAsync().get(2, 
TimeUnit.SECONDS));
             assertFalse(channel2.isChannelOwnerAsync().get(2, 
TimeUnit.SECONDS));
         } else {
@@ -282,8 +281,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
 
-        assertNull(owner1.get());
-        assertNull(owner2.get());
+        assertTrue(owner1.get().isEmpty());
+        assertTrue(owner2.get().isEmpty());
 
         var assigned1 = channel1.publishAssignEventAsync(bundle, 
lookupServiceAddress1);
         var assigned2 = channel2.publishAssignEventAsync(bundle, 
lookupServiceAddress2);
@@ -329,15 +328,15 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
 
-        assertNull(owner1.get());
-        assertNull(owner2.get());
+        assertTrue(owner1.get().isEmpty());
+        assertTrue(owner2.get().isEmpty());
 
-        owner1 = channel1.publishAssignEventAsync(bundle, 
lookupServiceAddress1);
-        owner2 = channel2.publishAssignEventAsync(bundle, 
lookupServiceAddress2);
-        assertTrue(owner1.isCompletedExceptionally());
-        assertNotNull(owner2);
-        String ownerAddr2 = owner2.get(5, TimeUnit.SECONDS);
-        assertEquals(ownerAddr2, lookupServiceAddress2);
+        var owner3 = channel1.publishAssignEventAsync(bundle, 
lookupServiceAddress1);
+        var owner4 = channel2.publishAssignEventAsync(bundle, 
lookupServiceAddress2);
+        assertTrue(owner3.isCompletedExceptionally());
+        assertNotNull(owner4);
+        String ownerAddrOpt2 = owner4.get(5, TimeUnit.SECONDS);
+        assertEquals(ownerAddrOpt2, lookupServiceAddress2);
         waitUntilNewOwner(channel1, bundle, lookupServiceAddress2);
         assertEquals(0, getOwnerRequests1.size());
         assertEquals(0, getOwnerRequests2.size());
@@ -352,8 +351,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
 
-        assertNull(owner1.get());
-        assertNull(owner2.get());
+        assertTrue(owner1.get().isEmpty());
+        assertTrue(owner2.get().isEmpty());
 
 
         channel1.publishAssignEventAsync(bundle, lookupServiceAddress1);
@@ -363,7 +362,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, lookupServiceAddress1);
+        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
 
         Unload unload = new Unload(lookupServiceAddress1, bundle, 
Optional.of(lookupServiceAddress2));
         channel1.publishUnloadEventAsync(unload);
@@ -374,7 +373,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         ownerAddr1 = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
         ownerAddr2 = channel2.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS);
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, lookupServiceAddress2);
+        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress2));
     }
 
     @Test(priority = 5)
@@ -393,7 +392,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
 
         assertEquals(ownerAddr1, ownerAddr2);
-        assertEquals(ownerAddr1, lookupServiceAddress1);
+        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
 
         var producer = (Producer<ServiceUnitStateData>) 
FieldUtils.readDeclaredField(channel1,
                 "producer", true);
@@ -454,10 +453,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         waitUntilNewOwner(channel2, bundle, lookupServiceAddress1);
         var ownerAddr1 = channel1.getOwnerAsync(bundle).get();
         var ownerAddr2 = channel2.getOwnerAsync(bundle).get();
-        assertEquals(ownerAddr1, lookupServiceAddress1);
-        assertEquals(ownerAddr2, lookupServiceAddress1);
+        assertEquals(ownerAddr1, Optional.of(lookupServiceAddress1));
+        assertEquals(ownerAddr2, Optional.of(lookupServiceAddress1));
+        assertTrue(ownerAddr1.isPresent());
 
-        Split split = new Split(bundle, ownerAddr1, new HashMap<>());
+        Split split = new Split(bundle, ownerAddr1.get(), new HashMap<>());
         channel1.publishSplitEventAsync(split);
 
         waitUntilNewOwner(channel1, bundle, null);
@@ -545,8 +545,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         var owner1 = channel1.getOwnerAsync(bundle1);
         var owner2 = channel2.getOwnerAsync(bundle2);
 
-        assertNull(owner1.get());
-        assertNull(owner2.get());
+        assertTrue(owner1.get().isEmpty());
+        assertTrue(owner2.get().isEmpty());
 
         String broker = lookupServiceAddress1;
         channel1.publishAssignEventAsync(bundle1, broker);
@@ -701,8 +701,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         producer.newMessage().key(bundle).send();
         var owner1 = channel1.getOwnerAsync(bundle);
         var owner2 = channel2.getOwnerAsync(bundle);
-        assertNull(owner1.get());
-        assertNull(owner2.get());
+        assertTrue(owner1.get().isEmpty());
+        assertTrue(owner2.get().isEmpty());
 
         var assigned1 = channel1.publishAssignEventAsync(bundle, 
lookupServiceAddress1);
         assertNotNull(assigned1);
@@ -724,8 +724,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         }
         assertNotNull(ex);
         assertEquals(TimeoutException.class, ex.getCause().getClass());
-        assertEquals(lookupServiceAddress1, 
channel2.getOwnerAsync(bundle).get());
-        assertEquals(lookupServiceAddress1, 
channel1.getOwnerAsync(bundle).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel2.getOwnerAsync(bundle).get());
+        assertEquals(Optional.of(lookupServiceAddress1), 
channel1.getOwnerAsync(bundle).get());
 
         var compactor = spy (pulsar1.getStrategicCompactor());
         Field strategicCompactorField = 
FieldUtils.getDeclaredField(PulsarService.class, "strategicCompactor", true);
@@ -743,7 +743,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 .pollInterval(200, TimeUnit.MILLISECONDS)
                 .atMost(5, TimeUnit.SECONDS)
                 .untilAsserted(() -> assertEquals(
-                        channel3.getOwnerAsync(bundle).get(), 
lookupServiceAddress1));
+                        channel3.getOwnerAsync(bundle).get(), 
Optional.of(lookupServiceAddress1)));
         channel3.close();
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 30 * 1000, true);
@@ -787,10 +787,8 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                     if (!owner.isDone()) {
                         return false;
                     }
-                    if (oldOwner == null) {
-                        return owner != null;
-                    }
-                    return !oldOwner.equals(owner);
+
+                    return !StringUtils.equals(oldOwner, 
owner.get().orElse(null));
                 });
     }
 
@@ -799,11 +797,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 .pollInterval(200, TimeUnit.MILLISECONDS)
                 .atMost(10, TimeUnit.SECONDS)
                 .until(() -> { // wait until true
-                    CompletableFuture<String> owner = 
channel.getOwnerAsync(serviceUnit);
+                    CompletableFuture<Optional<String>> owner = 
channel.getOwnerAsync(serviceUnit);
                     if (!owner.isDone()) {
                         return false;
                     }
-                    return !StringUtils.equals(oldOwner, owner.get());
+                    return !StringUtils.equals(oldOwner, 
owner.get().orElse(null));
                 });
     }
 
@@ -813,11 +811,11 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 .atMost(15, TimeUnit.SECONDS)
                 .until(() -> { // wait until true
                     try {
-                        CompletableFuture<String> owner = 
channel.getOwnerAsync(serviceUnit);
+                        CompletableFuture<Optional<String>> owner = 
channel.getOwnerAsync(serviceUnit);
                         if (!owner.isDone()) {
                             return false;
                         }
-                        return StringUtils.equals(newOwner, owner.get());
+                        return StringUtils.equals(newOwner, 
owner.get().orElse(null));
                     } catch (Exception e) {
                         return false;
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
index ef0e65762f1..2856dde892a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java
@@ -24,9 +24,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -78,7 +77,7 @@ public class LeastResourceUsageWithWeightTest {
         LeastResourceUsageWithWeight strategy = new 
LeastResourceUsageWithWeight();
 
         // Should choice broker from broker1 2 3.
-        List<String> candidates = new ArrayList<>();
+        Set<String> candidates = new HashSet<>();
         candidates.add("1");
         candidates.add("2");
         candidates.add("3");
@@ -125,7 +124,7 @@ public class LeastResourceUsageWithWeightTest {
         LeastResourceUsageWithWeight strategy = new 
LeastResourceUsageWithWeight();
 
         // Should choice broker from broker1 2 3.
-        List<String> candidates = new ArrayList<>();
+        Set<String> candidates = new HashSet<>();
         candidates.add("1");
         candidates.add("2");
         candidates.add("3");
@@ -141,7 +140,7 @@ public class LeastResourceUsageWithWeightTest {
 
         LeastResourceUsageWithWeight strategy = new 
LeastResourceUsageWithWeight();
 
-        List<String> candidates = new ArrayList<>();
+        Set<String> candidates = new HashSet<>();
         var brokerLoadDataStore = ctx.brokerLoadDataStore();
         brokerLoadDataStore.pushAsync("1", createBrokerData(ctx,50, 100));
         brokerLoadDataStore.pushAsync("2", createBrokerData(ctx,100, 100));
@@ -189,7 +188,7 @@ public class LeastResourceUsageWithWeightTest {
                 1, 1, 1, 1, ctx.brokerConfiguration());
     }
 
-    public LoadManagerContext getContext() {
+    public static LoadManagerContext getContext() {
         var ctx = mock(LoadManagerContext.class);
         var conf = new ServiceConfiguration();
         conf.setLoadBalancerCPUResourceWeight(1.0);

Reply via email to