This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e677d5264 [improve][broker] PIP-192 updated metrics and cleanup 
broker selector (#19945)
32e677d5264 is described below

commit 32e677d5264d9145a5765735ae1e6ba0e88a5ef9
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Apr 5 05:00:32 2023 -0700

    [improve][broker] PIP-192 updated metrics and cleanup broker selector 
(#19945)
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  |   4 +-
 .../extensions/ExtensibleLoadManagerImpl.java      |  27 ++-
 .../channel/ServiceUnitStateChannelImpl.java       | 197 +++++++++++++++------
 .../extensions/models/AssignCounter.java           |  10 +-
 .../DefaultNamespaceBundleSplitStrategyImpl.java   |   3 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  93 ++++------
 .../channel/ServiceUnitStateChannelTest.java       |  35 ++--
 .../DefaultNamespaceBundleSplitStrategyTest.java   |   2 +-
 8 files changed, 212 insertions(+), 159 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index 1a2ca1ec4fb..0badbda1afd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -392,8 +392,8 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
-    static void createNamespaceIfAbsent(PulsarResources resources, 
NamespaceName namespaceName,
-            String cluster) throws IOException {
+    public static void createNamespaceIfAbsent(PulsarResources resources, 
NamespaceName namespaceName,
+                                               String cluster) throws 
IOException {
         createNamespaceIfAbsent(resources, namespaceName, cluster, 
DEFAULT_BUNDLE_NUMBER);
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 7e84fa5969a..c1234b6dab2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -164,8 +164,6 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     private final UnloadCounter unloadCounter = new UnloadCounter();
     private final SplitCounter splitCounter = new SplitCounter();
 
-    // record load metrics
-    private final AtomicReference<List<Metrics>> brokerLoadMetrics = new 
AtomicReference<>();
     // record unload metrics
     private final AtomicReference<List<Metrics>> unloadMetrics = new 
AtomicReference();
     // record split metrics
@@ -338,7 +336,6 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                                 return 
serviceUnitStateChannel.publishAssignEventAsync(bundle, brokerOpt.get())
                                         .thenApply(Optional::of);
                             } else {
-                                assignCounter.incrementEmpty();
                                 throw new IllegalStateException(
                                         "Failed to select the new owner broker 
for bundle: " + bundle);
                             }
@@ -368,11 +365,17 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                 return CompletableFuture.completedFuture(brokerLookupData);
             }));
         });
-        future.whenComplete((r, t) -> lookupRequests.remove(bundle));
+        future.whenComplete((r, t) -> {
+                    if (t != null) {
+                        assignCounter.incrementFailure();
+                    }
+                    lookupRequests.remove(bundle);
+                }
+        );
         return future;
     }
 
-    private CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle) {
+    public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId 
bundle) {
         BrokerRegistry brokerRegistry = getBrokerRegistry();
         return brokerRegistry.getAvailableBrokerLookupDataAsync()
                 .thenCompose(availableBrokers -> {
@@ -633,20 +636,12 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         }
     }
 
-    void updateBrokerLoadMetrics(BrokerLoadData loadData) {
-        
this.brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
-    }
-
-    private void updateUnloadMetrics(UnloadDecision decision) {
-        unloadCounter.update(decision);
-        
this.unloadMetrics.set(unloadCounter.toMetrics(pulsar.getAdvertisedAddress()));
-    }
-
     public List<Metrics> getMetrics() {
         List<Metrics> metricsCollection = new ArrayList<>();
 
-        if (this.brokerLoadMetrics.get() != null) {
-            metricsCollection.addAll(this.brokerLoadMetrics.get());
+        if (this.brokerLoadDataReporter != null) {
+            metricsCollection.addAll(brokerLoadDataReporter.generateLoadData()
+                    .toMetrics(pulsar.getAdvertisedAddress()));
         }
         if (this.unloadMetrics.get() != null) {
             metricsCollection.addAll(this.unloadMetrics.get());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index e7fb59450b2..ec79698db1f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -41,6 +41,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUni
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Stable;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MetadataState.Unstable;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
 import static 
org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
 import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +66,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.pulsar.PulsarClusterMetadataSetup;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -76,8 +78,6 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import 
org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
-import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
-import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -104,7 +104,7 @@ import org.apache.pulsar.metadata.api.extended.SessionEvent;
 public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
     public static final String TOPIC = TopicName.get(
             TopicDomain.persistent.value(),
-            NamespaceName.SYSTEM_NAMESPACE,
+            SYSTEM_NAMESPACE,
             "loadbalancer-service-unit-state").toString();
     private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30 
* 1000; // 30sec
     public static final long VERSION_ID_INIT = 1; // initial versionId
@@ -113,6 +113,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs 
to clean immediately
     private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS 
= 10;
     private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
+    private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 
* 60 * 1000;
     private final PulsarService pulsar;
     private final ServiceConfiguration config;
     private final Schema<ServiceUnitStateData> schema;
@@ -120,7 +121,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final String lookupServiceAddress;
     private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> 
cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
-    private BrokerSelectionStrategy brokerSelector;
+    private ExtensibleLoadManagerImpl loadManager;
     private BrokerRegistry brokerRegistry;
     private LeaderElectionService leaderElectionService;
     private TableView<ServiceUnitStateData> tableview;
@@ -144,11 +145,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private long totalInactiveBrokerCleanupIgnoredCnt = 0;
     private long totalInactiveBrokerCleanupCancelledCnt = 0;
     private volatile ChannelState channelState;
+    private volatile long lastOwnEventHandledAt = 0;
+    private long lastOwnedServiceUnitCountAt = 0;
+    private int totalOwnedServiceUnitCnt = 0;
 
     public enum EventType {
         Assign,
         Split,
-        Unload
+        Unload,
+        Override
 
     }
 
@@ -164,7 +169,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     // operation metrics
-    final Map<ServiceUnitState, AtomicLong> ownerLookUpCounters;
+    final Map<ServiceUnitState, Counters> ownerLookUpCounters;
     final Map<EventType, Counters> eventCounters;
     final Map<ServiceUnitState, Counters> handlerCounters;
 
@@ -207,11 +212,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         this.maxCleanupDelayTimeInSecs = MAX_CLEAN_UP_DELAY_TIME_IN_SECS;
         this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
 
-        Map<ServiceUnitState, AtomicLong> tmpOwnerLookUpCounters = new 
HashMap<>();
+        Map<ServiceUnitState, Counters> tmpOwnerLookUpCounters = new 
HashMap<>();
         Map<ServiceUnitState, Counters> tmpHandlerCounters = new HashMap<>();
         Map<EventType, Counters> tmpEventCounters = new HashMap<>();
         for (var state : ServiceUnitState.values()) {
-            tmpOwnerLookUpCounters.put(state, new AtomicLong());
+            tmpOwnerLookUpCounters.put(state, new Counters());
             tmpHandlerCounters.put(state, new Counters());
         }
         for (var event : EventType.values()) {
@@ -267,7 +272,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 log.warn("Failed to find the channel leader.");
             }
             this.channelState = LeaderElectionServiceStarted;
-            brokerSelector = getBrokerSelector();
+            loadManager = getLoadManager();
 
             if (producer != null) {
                 producer.close();
@@ -275,6 +280,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                     log.info("Closed the channel producer.");
                 }
             }
+            PulsarClusterMetadataSetup.createNamespaceIfAbsent
+                    (pulsar.getPulsarResources(), 
NamespaceName.SYSTEM_NAMESPACE, config.getClusterName());
+
             producer = pulsar.getClient().newProducer(schema)
                     .enableBatching(true)
                     .maxPendingMessages(MAX_OUTSTANDING_PUB_MESSAGES)
@@ -329,9 +337,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     @VisibleForTesting
-    protected BrokerSelectionStrategy getBrokerSelector() {
-        // TODO: make this selector configurable.
-        return new LeastResourceUsageWithWeight();
+    protected ExtensibleLoadManagerImpl getLoadManager() {
+        return ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get());
     }
 
     @VisibleForTesting
@@ -465,7 +472,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
         ServiceUnitStateData data = tableview.get(serviceUnit);
         ServiceUnitState state = state(data);
-        ownerLookUpCounters.get(state).incrementAndGet();
+        ownerLookUpCounters.get(state).getTotal().incrementAndGet();
         switch (state) {
             case Owned -> {
                 return 
CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
@@ -474,16 +481,22 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 return 
CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
             }
             case Assigning, Releasing -> {
-                return deferGetOwnerRequest(serviceUnit).thenApply(
+                return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) 
-> {
+                    if (e != null) {
+                        
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
+                    }
+                }).thenApply(
                         broker -> broker == null ? Optional.empty() : 
Optional.of(broker));
             }
             case Init, Free -> {
                 return CompletableFuture.completedFuture(Optional.empty());
             }
             case Deleted -> {
+                ownerLookUpCounters.get(state).getFailure().incrementAndGet();
                 return CompletableFuture.failedFuture(new 
IllegalArgumentException(serviceUnit + " is deleted."));
             }
             default -> {
+                ownerLookUpCounters.get(state).getFailure().incrementAndGet();
                 String errorMsg = String.format("Failed to process service 
unit state data: %s when get owner.", data);
                 log.error(errorMsg);
                 return CompletableFuture.failedFuture(new 
IllegalStateException(errorMsg));
@@ -522,6 +535,23 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         return getOwnerRequest;
     }
 
+    private CompletableFuture<Void> publishOverrideEventAsync(String 
serviceUnit,
+                                           ServiceUnitStateData orphanData,
+                                           ServiceUnitStateData override) {
+        if (!validateChannelState(Started, true)) {
+            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+        }
+        EventType eventType = EventType.Override;
+        eventCounters.get(eventType).getTotal().incrementAndGet();
+        return pubAsync(serviceUnit, override).whenComplete((__, e) -> {
+            if (e != null) {
+                eventCounters.get(eventType).getFailure().incrementAndGet();
+                log.error("Failed to override serviceUnit:{} from 
orphanData:{} to overrideData:{}",
+                        serviceUnit, orphanData, override, e);
+            }
+        }).thenApply(__ -> null);
+    }
+
     public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
         if (!validateChannelState(Started, true)) {
             return CompletableFuture.failedFuture(
@@ -658,6 +688,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         stateChangeListeners.notify(serviceUnit, data, null);
         if (isTargetBroker(data.dstBroker())) {
             log(null, serviceUnit, data, null);
+            lastOwnEventHandledAt = System.currentTimeMillis();
         }
     }
 
@@ -1022,33 +1053,34 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 broker, delayInSecs, cleanupJobs.size());
     }
 
-    private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData, Set<String> availableBrokers) {
-
-        Optional<String> selectedBroker = 
brokerSelector.select(availableBrokers, null, getContext());
+    private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData) {
+        Optional<String> selectedBroker = selectBroker(serviceUnit);
         if (selectedBroker.isPresent()) {
-            var override = new ServiceUnitStateData(Owned, 
selectedBroker.get(), true, getNextVersionId(orphanData));
+            var override =
+                    new ServiceUnitStateData(Owned, selectedBroker.get(), 
true, getNextVersionId(orphanData));
             log.info("Overriding ownership serviceUnit:{} from orphanData:{} 
to overrideData:{}",
                     serviceUnit, orphanData, override);
-            pubAsync(serviceUnit, override).whenComplete((__, e) -> {
-                if (e != null) {
-                    log.error("Failed to override serviceUnit:{} from 
orphanData:{} to overrideData:{}",
-                            serviceUnit, orphanData, override, e);
-                }
-            });
+            publishOverrideEventAsync(serviceUnit, orphanData, override)
+                    .exceptionally(e -> {
+                        log.error(
+                                "Failed to override the ownership 
serviceUnit:{} orphanData:{}. "
+                                        + "Failed to publish override event. 
totalCleanupErrorCnt:{}",
+                                serviceUnit, orphanData, 
totalCleanupErrorCnt.incrementAndGet());
+                        return null;
+                    });
         } else {
-            log.error("Failed to override the ownership serviceUnit:{} 
orphanData:{}. Empty selected broker.",
-                    serviceUnit, orphanData);
+            log.error("Failed to override the ownership serviceUnit:{} 
orphanData:{}. Empty selected broker. "
+                            + "totalCleanupErrorCnt:{}",
+                    serviceUnit, orphanData, 
totalCleanupErrorCnt.incrementAndGet());
         }
     }
 
 
-    private void doCleanup(String broker) throws ExecutionException, 
InterruptedException, TimeoutException {
+    private void doCleanup(String broker)  {
         long startTime = System.nanoTime();
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
-        var availableBrokers = new 
HashSet<>(brokerRegistry.getAvailableBrokersAsync()
-                .get(inFlightStateWaitingTimeInMillis, MILLISECONDS));
 
         for (var etr : tableview.entrySet()) {
             var stateData = etr.getValue();
@@ -1056,13 +1088,13 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             var state = state(stateData);
             if (StringUtils.equals(broker, stateData.dstBroker())) {
                 if (isActiveState(state)) {
-                    overrideOwnership(serviceUnit, stateData, 
availableBrokers);
+                    overrideOwnership(serviceUnit, stateData);
                     orphanServiceUnitCleanupCnt++;
                 }
 
             } else if (StringUtils.equals(broker, stateData.sourceBroker())) {
                 if (isInFlightState(state)) {
-                    overrideOwnership(serviceUnit, stateData, 
availableBrokers);
+                    overrideOwnership(serviceUnit, stateData);
                     orphanServiceUnitCleanupCnt++;
                 }
             }
@@ -1097,31 +1129,40 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     }
 
-    private Optional<ServiceUnitStateData> getRollForwardStateData(
-            Set<String> availableBrokers, LoadManagerContext context, long 
nextVersionId) {
-        Optional<String> selectedBroker = 
brokerSelector.select(availableBrokers, null, context);
+    private Optional<String> selectBroker(String serviceUnit) {
+        try {
+            return loadManager.selectAsync(getNamespaceBundle(serviceUnit))
+                    .get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
+        } catch (Throwable e) {
+            log.error("Failed to select a broker for serviceUnit:{}", 
serviceUnit);
+        }
+        return Optional.empty();
+    }
+
+    private Optional<ServiceUnitStateData> getRollForwardStateData(String 
serviceUnit, long nextVersionId) {
+        Optional<String> selectedBroker = selectBroker(serviceUnit);
         if (selectedBroker.isEmpty()) {
             return Optional.empty();
         }
         return Optional.of(new ServiceUnitStateData(Owned, 
selectedBroker.get(), true, nextVersionId));
     }
 
+
     private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
             String serviceUnit, ServiceUnitStateData orphanData,
-            Set<String> availableBrokers,
-            LoadManagerContext context) {
+            Set<String> availableBrokers) {
         long nextVersionId = getNextVersionId(orphanData);
         var state = orphanData.state();
         switch (state) {
             case Assigning: {
-                return getRollForwardStateData(availableBrokers, context, 
nextVersionId);
+                return getRollForwardStateData(serviceUnit, nextVersionId);
             }
             case Splitting, Releasing: {
                 if (availableBrokers.contains(orphanData.sourceBroker())) {
                     // rollback to the src
                     return Optional.of(new ServiceUnitStateData(Owned, 
orphanData.sourceBroker(), true, nextVersionId));
                 } else {
-                    return getRollForwardStateData(availableBrokers, context, 
nextVersionId);
+                    return getRollForwardStateData(serviceUnit, nextVersionId);
                 }
             }
             default: {
@@ -1202,25 +1243,27 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 handleBrokerDeletionEvent(inactiveBroker);
             }
         } else if (!orphanServiceUnits.isEmpty()) {
-            var context = getContext();
             for (var etr : orphanServiceUnits.entrySet()) {
                 var orphanServiceUnit = etr.getKey();
                 var orphanData = etr.getValue();
                 var overrideData = getOverrideInFlightStateData(
-                        orphanServiceUnit, orphanData, activeBrokers, context);
+                        orphanServiceUnit, orphanData, activeBrokers);
                 if (overrideData.isPresent()) {
-                    pubAsync(orphanServiceUnit, 
overrideData.get()).whenComplete((__, e) -> {
-                        if (e != null) {
-                            log.error("Failed cleaning the ownership 
orphanServiceUnit:{}, orphanData:{}, "
-                                            + "cleanupErrorCnt:{}.",
-                                    orphanServiceUnit, orphanData,
-                                    totalCleanupErrorCnt.incrementAndGet() - 
totalCleanupErrorCntStart, e);
-                        }
-                    });
+                    publishOverrideEventAsync(orphanServiceUnit, orphanData, 
overrideData.get())
+                            .whenComplete((__, e) -> {
+                                if (e != null) {
+                                    log.error("Failed cleaning the ownership 
orphanServiceUnit:{}, orphanData:{}, "
+                                                    + "cleanupErrorCnt:{}.",
+                                            orphanServiceUnit, orphanData,
+                                            
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
+                                }
+                            });
                     orphanServiceUnitCleanupCnt++;
                 } else {
-                    log.warn("Failed get the overrideStateData from 
orphanServiceUnit:{}, orphanData:{}. will retry..",
-                            orphanServiceUnit, orphanData);
+                    log.warn("Failed get the overrideStateData from 
orphanServiceUnit:{}, orphanData:{},"
+                                    + " cleanupErrorCnt:{}. will retry..",
+                            orphanServiceUnit, orphanData,
+                            totalCleanupErrorCnt.incrementAndGet() - 
totalCleanupErrorCntStart);
                 }
             }
         }
@@ -1279,6 +1322,25 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         );
     }
 
+    private int getTotalOwnedServiceUnitCnt() {
+        if (tableview == null) {
+            return 0;
+        }
+        long now = System.currentTimeMillis();
+        if (lastOwnEventHandledAt > lastOwnedServiceUnitCountAt
+                || now - lastOwnedServiceUnitCountAt > 
MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS) {
+            int cnt = 0;
+            for (var data : tableview.values()) {
+                if (data.state() == Owned && isTargetBroker(data.dstBroker())) 
{
+                    cnt++;
+                }
+            }
+            lastOwnedServiceUnitCountAt = now;
+            totalOwnedServiceUnitCnt = cnt;
+        }
+        return totalOwnedServiceUnitCnt;
+    }
+
 
     @Override
     public List<Metrics> getMetrics() {
@@ -1288,11 +1350,25 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         dimensions.put("broker", pulsar.getAdvertisedAddress());
 
         for (var etr : ownerLookUpCounters.entrySet()) {
-            var dim = new HashMap<>(dimensions);
-            dim.put("state", etr.getKey().toString());
-            var metric = Metrics.create(dim);
-            metric.put("brk_sunit_state_chn_owner_lookup_total", 
etr.getValue());
-            metrics.add(metric);
+            {
+                var dim = new HashMap<>(dimensions);
+                dim.put("state", etr.getKey().toString());
+                dim.put("result", "Total");
+                var metric = Metrics.create(dim);
+                metric.put("brk_sunit_state_chn_owner_lookup_total",
+                        etr.getValue().getTotal().get());
+                metrics.add(metric);
+            }
+
+            {
+                var dim = new HashMap<>(dimensions);
+                dim.put("state", etr.getKey().toString());
+                dim.put("result", "Failure");
+                var metric = Metrics.create(dim);
+                metric.put("brk_sunit_state_chn_owner_lookup_total",
+                        etr.getValue().getFailure().get());
+                metrics.add(metric);
+            }
         }
 
         for (var etr : eventCounters.entrySet()) {
@@ -1371,10 +1447,19 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
             metrics.add(metric);
         }
 
+        {
+            var dim = new HashMap<>(dimensions);
+            dim.put("result", "Success");
+            var metric = Metrics.create(dim);
+            
metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", 
totalInactiveBrokerCleanupCnt);
+            metrics.add(metric);
+        }
+
         var metric = Metrics.create(dimensions);
-        metric.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", 
totalInactiveBrokerCleanupCnt);
+
         metric.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", 
totalOrphanServiceUnitCleanupCnt);
         metric.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", 
totalServiceUnitTombstoneCleanupCnt);
+        metric.put("brk_sunit_state_chn_owned_su_total", 
getTotalOwnedServiceUnitCnt());
         metrics.add(metric);
 
         return metrics;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
index 26ff1f5f401..8e19b7b45e2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/AssignCounter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.models;
 
-import static 
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Empty;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Failure;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Skip;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter.Label.Success;
 import java.util.ArrayList;
@@ -35,7 +35,7 @@ public class AssignCounter {
 
     enum Label {
         Success,
-        Empty,
+        Failure,
         Skip,
     }
 
@@ -44,7 +44,7 @@ public class AssignCounter {
     public AssignCounter() {
         breakdownCounters = Map.of(
                 Success, new AtomicLong(),
-                Empty, new AtomicLong(),
+                Failure, new AtomicLong(),
                 Skip, new AtomicLong()
         );
     }
@@ -54,8 +54,8 @@ public class AssignCounter {
         breakdownCounters.get(Success).incrementAndGet();
     }
 
-    public void incrementEmpty() {
-        breakdownCounters.get(Empty).incrementAndGet();
+    public void incrementFailure() {
+        breakdownCounters.get(Failure).incrementAndGet();
     }
 
     public void incrementSkip() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
index 15bfdc747f1..bd8b3aa6654 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
@@ -98,9 +98,8 @@ public class DefaultNamespaceBundleSplitStrategyImpl 
implements NamespaceBundleS
 
             if (!channel.isOwner(bundle)) {
                 if (debug) {
-                    log.error(String.format(CANNOT_SPLIT_BUNDLE_MSG
+                    log.warn(String.format(CANNOT_SPLIT_BUNDLE_MSG
                             + " This broker is not the owner.", bundle));
-                    counter.update(Failure, Unknown);
                 }
                 continue;
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 502e02e465e..a9b145e3029 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -37,6 +37,7 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -59,7 +60,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import java.net.URL;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -81,13 +81,11 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.models.AssignCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitCounter;
 import org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
+import 
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TenantResources;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.TableViewImpl;
@@ -97,11 +95,8 @@ import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.stats.Metrics;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
@@ -122,8 +117,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 
     private PulsarTestContext additionalPulsarTestContext;
 
-    private PulsarResources resources;
-
     private ExtensibleLoadManagerImpl primaryLoadManager;
 
     private ExtensibleLoadManagerImpl secondaryLoadManager;
@@ -164,37 +157,6 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                 Sets.newHashSet(this.conf.getClusterName()));
     }
 
-    protected void beforePulsarStart(PulsarService pulsar) throws Exception {
-        if (resources == null) {
-            MetadataStoreExtended localStore = 
pulsar.createLocalMetadataStore(null);
-            MetadataStoreExtended configStore = (MetadataStoreExtended) 
pulsar.createConfigurationMetadataStore(null);
-            resources = new PulsarResources(localStore, configStore);
-        }
-        this.createNamespaceIfNotExists(resources, 
NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                NamespaceName.SYSTEM_NAMESPACE);
-    }
-
-    protected void createNamespaceIfNotExists(PulsarResources resources,
-                                              String publicTenant,
-                                              NamespaceName ns) throws 
Exception {
-        TenantResources tr = resources.getTenantResources();
-        NamespaceResources nsr = resources.getNamespaceResources();
-
-        if (!tr.tenantExists(publicTenant)) {
-            tr.createTenant(publicTenant,
-                    TenantInfo.builder()
-                            
.adminRoles(Sets.newHashSet(conf.getSuperUserRoles()))
-                            
.allowedClusters(Sets.newHashSet(conf.getClusterName()))
-                            .build());
-        }
-
-        if (!nsr.namespaceExists(ns)) {
-            Policies nsp = new Policies();
-            nsp.replication_clusters = 
Collections.singleton(conf.getClusterName());
-            nsr.createPolicies(ns, nsp);
-        }
-    }
-
     @Override
     @AfterClass
     protected void cleanup() throws Exception {
@@ -635,8 +597,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testGetMetrics() throws Exception {
         {
-            var brokerLoadMetrics = (AtomicReference<List<Metrics>>)
-                    FieldUtils.readDeclaredField(primaryLoadManager, 
"brokerLoadMetrics", true);
+            var brokerLoadDataReporter = mock(BrokerLoadDataReporter.class);
+            FieldUtils.writeDeclaredField(primaryLoadManager, 
"brokerLoadDataReporter", brokerLoadDataReporter, true);
             BrokerLoadData loadData = new BrokerLoadData();
             SystemResourceUsage usage = new SystemResourceUsage();
             var cpu = new ResourceUsage(1.0, 100.0);
@@ -650,7 +612,7 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             usage.setBandwidthIn(bandwidthIn);
             usage.setBandwidthOut(bandwidthOut);
             loadData.update(usage, 1, 2, 3, 4, 5, 6, conf);
-            
brokerLoadMetrics.set(loadData.toMetrics(pulsar.getAdvertisedAddress()));
+            doReturn(loadData).when(brokerLoadDataReporter).generateLoadData();
         }
         {
             var unloadMetrics = (AtomicReference<List<Metrics>>)
@@ -700,8 +662,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         {
             AssignCounter assignCounter = new AssignCounter();
             assignCounter.incrementSuccess();
-            assignCounter.incrementEmpty();
-            assignCounter.incrementEmpty();
+            assignCounter.incrementFailure();
+            assignCounter.incrementFailure();
             assignCounter.incrementSkip();
             assignCounter.incrementSkip();
             assignCounter.incrementSkip();
@@ -709,7 +671,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         }
 
         {
-
+            FieldUtils.writeDeclaredField(channel1, 
"lastOwnedServiceUnitCountAt", System.currentTimeMillis(), true);
+            FieldUtils.writeDeclaredField(channel1, 
"totalOwnedServiceUnitCnt", 10, true);
             FieldUtils.writeDeclaredField(channel1, 
"totalInactiveBrokerCleanupCnt", 1, true);
             FieldUtils.writeDeclaredField(channel1, 
"totalServiceUnitTombstoneCleanupCnt", 2, true);
             FieldUtils.writeDeclaredField(channel1, 
"totalOrphanServiceUnitCleanupCnt", 3, true);
@@ -718,21 +681,21 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             FieldUtils.writeDeclaredField(channel1, 
"totalInactiveBrokerCleanupIgnoredCnt", 6, true);
             FieldUtils.writeDeclaredField(channel1, 
"totalInactiveBrokerCleanupCancelledCnt", 7, true);
 
-            Map<ServiceUnitState, AtomicLong> ownerLookUpCounters = new 
LinkedHashMap<>();
+            Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters> 
ownerLookUpCounters = new LinkedHashMap<>();
             Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters> 
handlerCounters = new LinkedHashMap<>();
             Map<ServiceUnitStateChannelImpl.EventType, 
ServiceUnitStateChannelImpl.Counters> eventCounters =
                     new LinkedHashMap<>();
-            int i = 1;
             int j = 0;
             for (var state : ServiceUnitState.values()) {
-                ownerLookUpCounters.put(state, new AtomicLong(i));
+                ownerLookUpCounters.put(state,
+                        new ServiceUnitStateChannelImpl.Counters(
+                                new AtomicLong(j + 1), new AtomicLong(j + 2)));
                 handlerCounters.put(state,
                         new ServiceUnitStateChannelImpl.Counters(
                                 new AtomicLong(j + 1), new AtomicLong(j + 2)));
-                i++;
                 j += 2;
             }
-            i = 0;
+            int i = 0;
             for (var type : ServiceUnitStateChannelImpl.EventType.values()) {
                 eventCounters.put(type,
                         new ServiceUnitStateChannelImpl.Counters(
@@ -769,22 +732,31 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                         dimensions=[{broker=localhost, metric=bundlesSplit, 
reason=Bandwidth, result=Success}], 
metrics=[{brk_lb_bundles_split_breakdown_total=4}]
                         dimensions=[{broker=localhost, metric=bundlesSplit, 
reason=Admin, result=Success}], 
metrics=[{brk_lb_bundles_split_breakdown_total=5}]
                         dimensions=[{broker=localhost, metric=bundlesSplit, 
reason=Unknown, result=Failure}], 
metrics=[{brk_lb_bundles_split_breakdown_total=6}]
-                        dimensions=[{broker=localhost, metric=assign, 
result=Empty}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
+                        dimensions=[{broker=localhost, metric=assign, 
result=Failure}], metrics=[{brk_lb_assign_broker_breakdown_total=2}]
                         dimensions=[{broker=localhost, metric=assign, 
result=Skip}], metrics=[{brk_lb_assign_broker_breakdown_total=3}]
                         dimensions=[{broker=localhost, metric=assign, 
result=Success}], metrics=[{brk_lb_assign_broker_breakdown_total=1}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Owned}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Assigning}], metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Releasing}], metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Splitting}], metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn, 
state=Deleted}], metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Init}], metrics=[{brk_sunit_state_chn_owner_lookup_total=1}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Init}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=2}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Free}], metrics=[{brk_sunit_state_chn_owner_lookup_total=3}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Free}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=4}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Owned}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=5}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Owned}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=6}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Assigning}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=7}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Assigning}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=8}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Releasing}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=9}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Releasing}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=10}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Splitting}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=11}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Splitting}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=12}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Total, state=Deleted}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=13}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Failure, state=Deleted}], 
metrics=[{brk_sunit_state_chn_owner_lookup_total=14}]
                         dimensions=[{broker=localhost, event=Assign, 
metric=sunitStateChn, result=Total}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=1}]
                         dimensions=[{broker=localhost, event=Assign, 
metric=sunitStateChn, result=Failure}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=2}]
                         dimensions=[{broker=localhost, event=Split, 
metric=sunitStateChn, result=Total}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=3}]
                         dimensions=[{broker=localhost, event=Split, 
metric=sunitStateChn, result=Failure}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=4}]
                         dimensions=[{broker=localhost, event=Unload, 
metric=sunitStateChn, result=Total}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=5}]
                         dimensions=[{broker=localhost, event=Unload, 
metric=sunitStateChn, result=Failure}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=6}]
+                        dimensions=[{broker=localhost, event=Override, 
metric=sunitStateChn, result=Total}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=7}]
+                        dimensions=[{broker=localhost, event=Override, 
metric=sunitStateChn, result=Failure}], 
metrics=[{brk_sunit_state_chn_event_publish_ops_total=8}]
                         dimensions=[{broker=localhost, event=Init, 
metric=sunitStateChn, result=Total}], 
metrics=[{brk_sunit_state_chn_subscribe_ops_total=1}]
                         dimensions=[{broker=localhost, event=Init, 
metric=sunitStateChn, result=Failure}], 
metrics=[{brk_sunit_state_chn_subscribe_ops_total=2}]
                         dimensions=[{broker=localhost, event=Free, 
metric=sunitStateChn, result=Total}], 
metrics=[{brk_sunit_state_chn_subscribe_ops_total=3}]
@@ -803,7 +775,8 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                         dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Skip}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=6}]
                         dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Cancel}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=7}]
                         dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Schedule}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
-                        dimensions=[{broker=localhost, metric=sunitStateChn}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1, 
brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, 
brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Success}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]
+                        dimensions=[{broker=localhost, metric=sunitStateChn}], 
metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, 
brk_sunit_state_chn_owned_su_total=10, 
brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
                         """.split("\n"));
         var actual = primaryLoadManager.getMetrics().stream().map(m -> 
m.toString()).collect(Collectors.toSet());
         assertEquals(actual, expected);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 9eda98e5d84..2d98bc5cae6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -75,11 +75,11 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
 import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
-import 
org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.Producer;
@@ -116,7 +116,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     private BrokerRegistryImpl registry;
 
-    private BrokerSelectionStrategy brokerSelector;
+    private ExtensibleLoadManagerImpl loadManager;
 
     @BeforeClass
     @Override
@@ -135,7 +135,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         loadManagerContext = mock(LoadManagerContext.class);
         
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).brokerLoadDataStore();
         
doReturn(mock(LoadDataStore.class)).when(loadManagerContext).topBundleLoadDataStore();
-        brokerSelector = mock(BrokerSelectionStrategy.class);
+        loadManager = mock(ExtensibleLoadManagerImpl.class);
         additionalPulsarTestContext = 
createAdditionalPulsarTestContext(getDefaultConf());
         pulsar2 = additionalPulsarTestContext.getPulsarService();
 
@@ -496,7 +496,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(0, getOwnerRequests2.size());
 
         // recovered, check the monitor update state : Assigned -> Owned
-        
doReturn(Optional.of(lookupServiceAddress1)).when(brokerSelector).select(any(), 
any(), any());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))).when(loadManager).selectAsync(any());
         FieldUtils.writeDeclaredField(channel2, "producer", producer, true);
         FieldUtils.writeDeclaredField(channel1,
                 "inFlightStateWaitingTimeInMillis", 1 , true);
@@ -714,7 +714,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         var owner1 = channel1.getOwnerAsync(bundle1);
         var owner2 = channel2.getOwnerAsync(bundle2);
-        
doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(), 
any(), any());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
 
         assertTrue(owner1.get().isEmpty());
         assertTrue(owner2.get().isEmpty());
@@ -1076,7 +1076,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
         FieldUtils.writeDeclaredField(channel2,
                 "inFlightStateWaitingTimeInMillis", 3 * 1000, true);
-        
doReturn(Optional.of(lookupServiceAddress2)).when(brokerSelector).select(any(), 
any(), any());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress2))).when(loadManager).selectAsync(any());
         channel1.publishAssignEventAsync(bundle, lookupServiceAddress2);
         // channel1 is broken. the assign won't be complete.
         waitUntilState(channel1, bundle);
@@ -1418,11 +1418,12 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
         }
 
         var ownerLookUpCounters =
-                (Map<ServiceUnitStateChannelImpl.EventType, AtomicLong>)
+                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
                         FieldUtils.readDeclaredField(channel, 
"ownerLookUpCounters", true);
 
         for(var val : ownerLookUpCounters.values()){
-            val.set(0);
+            val.getFailure().set(0);
+            val.getTotal().set(0);
         }
     }
 
@@ -1518,20 +1519,20 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
                                                     )
             throws IllegalAccessException {
         var ownerLookUpCounters =
-                (Map<ServiceUnitState, AtomicLong>)
+                (Map<ServiceUnitState, ServiceUnitStateChannelImpl.Counters>)
                         FieldUtils.readDeclaredField(channel, 
"ownerLookUpCounters", true);
 
         Awaitility.await()
                 .pollInterval(200, TimeUnit.MILLISECONDS)
                 .atMost(10, TimeUnit.SECONDS)
                 .untilAsserted(() -> { // wait until true
-                    assertEquals(assigned, 
ownerLookUpCounters.get(Assigning).get());
-                    assertEquals(owned, ownerLookUpCounters.get(Owned).get());
-                    assertEquals(released, 
ownerLookUpCounters.get(Releasing).get());
-                    assertEquals(splitting, 
ownerLookUpCounters.get(Splitting).get());
-                    assertEquals(free, ownerLookUpCounters.get(Free).get());
-                    assertEquals(deleted, 
ownerLookUpCounters.get(Deleted).get());
-                    assertEquals(init, ownerLookUpCounters.get(Init).get());
+                    assertEquals(assigned, 
ownerLookUpCounters.get(Assigning).getTotal().get());
+                    assertEquals(owned, 
ownerLookUpCounters.get(Owned).getTotal().get());
+                    assertEquals(released, 
ownerLookUpCounters.get(Releasing).getTotal().get());
+                    assertEquals(splitting, 
ownerLookUpCounters.get(Splitting).getTotal().get());
+                    assertEquals(free, 
ownerLookUpCounters.get(Free).getTotal().get());
+                    assertEquals(deleted, 
ownerLookUpCounters.get(Deleted).getTotal().get());
+                    assertEquals(init, 
ownerLookUpCounters.get(Init).getTotal().get());
                 });
     }
 
@@ -1565,7 +1566,7 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
         doReturn(loadManagerContext).when(channel).getContext();
         doReturn(registry).when(channel).getBrokerRegistry();
-        doReturn(brokerSelector).when(channel).getBrokerSelector();
+        doReturn(loadManager).when(channel).getLoadManager();
 
 
         var leaderElectionService = new LeaderElectionService(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
index 0aa055b58ac..8c765e7a3df 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyTest.java
@@ -169,7 +169,7 @@ public class DefaultNamespaceBundleSplitStrategyTest {
         var actual = strategy.findBundlesToSplit(loadManagerContext, pulsar);
         var expected = Set.of();
         assertEquals(actual, expected);
-        verify(counter, times(2)).update(eq(SplitDecision.Label.Failure), 
eq(Unknown));
+        verify(counter, times(0)).update(eq(SplitDecision.Label.Failure), 
eq(Unknown));
     }
 
     public void testError() throws Exception {

Reply via email to