This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 28ed48e8138 [improve][broker] PIP-307: Add monitoring metrics for 
graceful closure of producers/consumers (#21854)
28ed48e8138 is described below

commit 28ed48e813843c68e369b1d618de1e7455b191de
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jan 9 16:42:43 2024 -0800

    [improve][broker] PIP-307: Add monitoring metrics for graceful closure of 
producers/consumers (#21854)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  16 +++-
 .../channel/ServiceUnitStateChannelImpl.java       |   1 -
 .../extensions/channel/StateChangeListeners.java   |  16 +++-
 .../extensions/manager/StateChangeListener.java    |  10 +-
 .../extensions/manager/UnloadManager.java          | 105 ++++++++++++++++++---
 .../extensions/models/UnloadCounter.java           |   1 -
 .../apache/pulsar/broker/service/ServerCnx.java    |  28 ++++--
 .../broker/stats/prometheus/metrics/Summary.java   |   2 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  |   9 +-
 .../extensions/manager/UnloadManagerTest.java      |  11 +--
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  19 +++-
 11 files changed, 181 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 581183cf95a..4664a840c92 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -181,7 +181,9 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
 
     // Record the ignored send msg count during unloading
     @Getter
-    private final AtomicLong ignoredSendMsgCounter = new AtomicLong();
+    private final AtomicLong ignoredSendMsgCount = new AtomicLong();
+    @Getter
+    private final AtomicLong ignoredAckCount = new AtomicLong();
 
     // record unload metrics
     private final AtomicReference<List<Metrics>> unloadMetrics = new 
AtomicReference<>();
@@ -361,7 +363,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
             this.serviceUnitStateChannel = new 
ServiceUnitStateChannelImpl(pulsar);
             this.brokerRegistry.start();
             this.splitManager = new SplitManager(splitCounter);
-            this.unloadManager = new UnloadManager(unloadCounter);
+            this.unloadManager = new UnloadManager(unloadCounter, 
pulsar.getLookupServiceAddress());
             this.serviceUnitStateChannel.listen(unloadManager);
             this.serviceUnitStateChannel.listen(splitManager);
             this.leaderElectionService.start();
@@ -874,12 +876,20 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         }
 
         
metricsCollection.addAll(this.assignCounter.toMetrics(pulsar.getAdvertisedAddress()));
-
         metricsCollection.addAll(this.serviceUnitStateChannel.getMetrics());
+        
metricsCollection.addAll(getIgnoredCommandMetrics(pulsar.getAdvertisedAddress()));
 
         return metricsCollection;
     }
 
+    private List<Metrics> getIgnoredCommandMetrics(String 
advertisedBrokerAddress) {
+        var dimensions = Map.of("broker", advertisedBrokerAddress, "metric", 
"bundleUnloading");
+        var metric = Metrics.create(dimensions);
+        metric.put("brk_lb_ignored_ack_total", ignoredAckCount.get());
+        metric.put("brk_lb_ignored_send_total", ignoredSendMsgCount.get());
+        return List.of(metric);
+    }
+
     private void monitor() {
         try {
             initWaiter.await();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd571284346..118667d2575 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -160,7 +160,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         Split,
         Unload,
         Override
-
     }
 
     @Getter
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
index 1d396f500b6..f0d99e931bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
@@ -51,7 +51,21 @@ public class StateChangeListeners {
     public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T> 
future,
                                                        String serviceUnit,
                                                        ServiceUnitStateData 
data) {
-        return future.whenComplete((r, ex) -> notify(serviceUnit, data, ex));
+        return notifyOnArrival(serviceUnit, data).
+                thenCombine(future, (unused, t) -> t).
+                whenComplete((r, ex) -> notify(serviceUnit, data, ex));
+    }
+
+    private CompletableFuture<Void> notifyOnArrival(String serviceUnit, 
ServiceUnitStateData data) {
+        stateChangeListeners.forEach(listener -> {
+            try {
+                listener.beforeEvent(serviceUnit, data);
+            } catch (Throwable ex) {
+                log.error("StateChangeListener: {} exception while notifying 
arrival event {} for service unit {}",
+                        listener, data, serviceUnit, ex);
+            }
+        });
+        return CompletableFuture.completedFuture(null);
     }
 
     public void notify(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
index 7ba8be8771b..0d26859f82e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
@@ -23,7 +23,15 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateD
 public interface StateChangeListener {
 
     /**
-     * Handle the service unit state change.
+     * Called before the state change is handled.
+     *
+     * @param serviceUnit - Service Unit(Namespace bundle).
+     * @param data - Service unit state data.
+     */
+    default void beforeEvent(String serviceUnit, ServiceUnitStateData data) { }
+
+    /**
+     * Called after the service unit state change has been handled.
      *
      * @param serviceUnit - Service Unit(Namespace bundle).
      * @param data - Service unit state data.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index ffdbbc2af42..a613d48575d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -20,7 +20,10 @@ package 
org.apache.pulsar.broker.loadbalance.extensions.manager;
 
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.Histogram;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -38,13 +41,77 @@ public class UnloadManager implements StateChangeListener {
 
     private final UnloadCounter counter;
     private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+    private final String lookupServiceAddress;
 
-    public UnloadManager(UnloadCounter counter) {
+    @VisibleForTesting
+    public enum LatencyMetric {
+        UNLOAD(buildHistogram(
+            "brk_lb_unload_latency", "Total time duration of unload operations 
on source brokers"), true, false),
+        ASSIGN(buildHistogram(
+            "brk_lb_assign_latency", "Time spent in the load balancing ASSIGN 
state on destination brokers"),
+                false, true),
+        RELEASE(buildHistogram(
+            "brk_lb_release_latency", "Time spent in the load balancing 
RELEASE state on source brokers"), true, false),
+        DISCONNECT(buildHistogram(
+            "brk_lb_disconnect_latency", "Time spent in the load balancing 
disconnected state on source brokers"),
+                true, false);
+
+        private static Histogram buildHistogram(String name, String help) {
+            return Histogram.build(name, help).unit("ms").labelNames("broker", 
"metric").
+                    buckets(new double[] {1.0, 10.0, 100.0, 200.0, 
1000.0}).register();
+        }
+        private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
+
+        private final Histogram histogram;
+        private final Map<String, CompletableFuture<Void>> futures = new 
ConcurrentHashMap<>();
+        private final boolean isSourceBrokerMetric;
+        private final boolean isDestinationBrokerMetric;
+
+        LatencyMetric(Histogram histogram, boolean isSourceBrokerMetric, 
boolean isDestinationBrokerMetric) {
+            this.histogram = histogram;
+            this.isSourceBrokerMetric = isSourceBrokerMetric;
+            this.isDestinationBrokerMetric = isDestinationBrokerMetric;
+        }
+
+        public void beginMeasurement(String serviceUnit, String 
lookupServiceAddress, ServiceUnitStateData data) {
+            if ((isSourceBrokerMetric && 
lookupServiceAddress.equals(data.sourceBroker()))
+                    || (isDestinationBrokerMetric && 
lookupServiceAddress.equals(data.dstBroker()))) {
+                var startTimeNs = System.nanoTime();
+                futures.computeIfAbsent(serviceUnit, ignore -> {
+                    var future = new CompletableFuture<Void>();
+                    future.completeOnTimeout(null, OP_TIMEOUT_NS, 
TimeUnit.NANOSECONDS).
+                            thenAccept(__ -> {
+                                var durationMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs);
+                                log.info("Operation {} for service unit {} 
took {} ms", this, serviceUnit, durationMs);
+                                histogram.labels(lookupServiceAddress, 
"bundleUnloading").observe(durationMs);
+                            }).whenComplete((__, throwable) -> 
futures.remove(serviceUnit, future));
+                    return future;
+                });
+            }
+        }
+
+        public void endMeasurement(String serviceUnit) {
+            var future = futures.get(serviceUnit);
+            if (future != null) {
+                future.complete(null);
+            }
+        }
+    }
+
+    public UnloadManager(UnloadCounter counter, String lookupServiceAddress) {
         this.counter = counter;
-        this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+        this.lookupServiceAddress = 
Objects.requireNonNull(lookupServiceAddress);
+        inFlightUnloadRequest = new ConcurrentHashMap<>();
     }
 
     private void complete(String serviceUnit, Throwable ex) {
+        LatencyMetric.UNLOAD.endMeasurement(serviceUnit);
+        LatencyMetric.DISCONNECT.endMeasurement(serviceUnit);
+        if (ex != null) {
+            LatencyMetric.RELEASE.endMeasurement(serviceUnit);
+            LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
+        }
+
         inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
             if (!future.isDone()) {
                 if (ex != null) {
@@ -62,7 +129,6 @@ public class UnloadManager implements StateChangeListener {
                                              UnloadDecision decision,
                                              long timeout,
                                              TimeUnit timeoutUnit) {
-
         return eventPubFuture.thenCompose(__ -> 
inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> {
             if (log.isDebugEnabled()) {
                 log.debug("Handle unload bundle: {}, timeout: {} {}", bundle, 
timeout, timeoutUnit);
@@ -86,23 +152,40 @@ public class UnloadManager implements StateChangeListener {
         });
     }
 
+    @Override
+    public void beforeEvent(String serviceUnit, ServiceUnitStateData data) {
+        if (log.isDebugEnabled()) {
+            log.debug("Handling arrival of {} for service unit {}", data, 
serviceUnit);
+        }
+        ServiceUnitState state = ServiceUnitStateData.state(data);
+        switch (state) {
+            case Free, Owned -> 
LatencyMetric.DISCONNECT.beginMeasurement(serviceUnit, lookupServiceAddress, 
data);
+            case Releasing -> {
+                LatencyMetric.RELEASE.beginMeasurement(serviceUnit, 
lookupServiceAddress, data);
+                LatencyMetric.UNLOAD.beginMeasurement(serviceUnit, 
lookupServiceAddress, data);
+            }
+            case Assigning -> 
LatencyMetric.ASSIGN.beginMeasurement(serviceUnit, lookupServiceAddress, data);
+        }
+    }
+
     @Override
     public void handleEvent(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
-        if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+        if (t != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Handling {} for service unit {} with exception.", 
data, serviceUnit, t);
             }
-            this.complete(serviceUnit, t);
+            complete(serviceUnit, t);
             return;
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Handling {} for service unit {}", data, serviceUnit);
+        }
         ServiceUnitState state = ServiceUnitStateData.state(data);
         switch (state) {
-            case Free, Owned -> this.complete(serviceUnit, t);
-            default -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("Handling {} for service unit {}", data, 
serviceUnit);
-                }
-            }
+            case Free, Owned -> complete(serviceUnit, t);
+            case Releasing -> 
LatencyMetric.RELEASE.endMeasurement(serviceUnit);
+            case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
         }
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
index 4a5d41f7576..72be586e465 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
@@ -108,7 +108,6 @@ public class UnloadCounter {
     }
 
     public List<Metrics> toMetrics(String advertisedBrokerAddress) {
-
         var metrics = new ArrayList<Metrics>();
         var dimensions = new HashMap<String, String>();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4e3eb9fd4ad..bd4917da3b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1788,15 +1788,18 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             printSendCommandDebug(send, headersAndPayload);
         }
 
-        PulsarService pulsar = getBrokerService().pulsar();
-        // if the topic is transferring, we ignore send msg.
+        // New messages are silently ignored during topic transfer. Note that 
the transferring flag is only set when the
+        // Extensible Load Manager is enabled.
         if (producer.getTopic().isTransferring()) {
-            long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
-                    
.getIgnoredSendMsgCounter().addAndGet(send.getNumMessages());
+            var pulsar = getBrokerService().pulsar();
+            var ignoredMsgCount = send.getNumMessages();
+            var ignoredSendMsgTotalCount = 
ExtensibleLoadManagerImpl.get(pulsar).getIgnoredSendMsgCount().
+                    addAndGet(ignoredMsgCount);
             if (log.isDebugEnabled()) {
-                log.debug("Ignored send msg from:{}:{} to fenced topic:{} 
while transferring."
-                                + " Ignored message count:{}.",
-                        remoteAddress, send.getProducerId(), 
producer.getTopic().getName(), ignoredMsgCount);
+                log.debug("Ignoring {} messages from:{}:{} to fenced topic:{} 
while transferring."
+                                + " Total ignored message count: {}.",
+                        ignoredMsgCount, remoteAddress, send.getProducerId(), 
producer.getTopic().getName(),
+                        ignoredSendMsgTotalCount);
             }
             return;
         }
@@ -1869,11 +1872,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
             Subscription subscription = consumer.getSubscription();
+            // Message acks are silently ignored during topic transfer. Note 
that the transferring flag is only set when
+            // the Extensible Load Manager is enabled.
             if (subscription.getTopic().isTransferring()) {
-                // Message acks are silently ignored during topic transfer.
+                var pulsar = getBrokerService().getPulsar();
+                var ignoredAckCount = ack.getMessageIdsCount();
+                var ignoredAckTotalCount = 
ExtensibleLoadManagerImpl.get(pulsar).getIgnoredAckCount().
+                        addAndGet(ignoredAckCount);
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] [{}] Ignoring message acknowledgment 
during topic transfer, ack count: {}",
-                            subscription, consumerId, 
ack.getMessageIdsCount());
+                    log.debug("[{}] [{}] Ignoring {} message acks during topic 
transfer. Total ignored ack count: {}",
+                            subscription, consumerId, ignoredAckCount, 
ignoredAckTotalCount);
                 }
                 return;
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
index ba640761242..cb4a33a7294 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
@@ -45,7 +45,7 @@ public class Summary extends SimpleCollector<Summary.Child> 
implements Collector
         }
     }
 
-    static class Child {
+    public static class Child {
         private final DataSketchesSummaryLogger logger;
         private final List<Double> quantiles;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index bb7416ddc41..11d1ef900da 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1241,6 +1241,12 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
             FieldUtils.writeDeclaredField(channel1, "handlerCounters", 
handlerCounters, true);
         }
 
+        primaryLoadManager.getIgnoredSendMsgCount().incrementAndGet();
+        primaryLoadManager.getIgnoredSendMsgCount().incrementAndGet();
+        primaryLoadManager.getIgnoredAckCount().incrementAndGet();
+        primaryLoadManager.getIgnoredAckCount().incrementAndGet();
+        primaryLoadManager.getIgnoredAckCount().incrementAndGet();
+
         var expected = Set.of(
                 """
                         dimensions=[{broker=localhost, metric=loadBalancing}], 
metrics=[{brk_lb_bandwidth_in_usage=3.0, brk_lb_bandwidth_out_usage=4.0, 
brk_lb_cpu_usage=1.0, brk_lb_directMemory_usage=2.0, brk_lb_memory_usage=400.0}]
@@ -1311,8 +1317,9 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                         dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Schedule}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
                         dimensions=[{broker=localhost, metric=sunitStateChn, 
result=Success}], 
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]
                         dimensions=[{broker=localhost, metric=sunitStateChn}], 
metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3, 
brk_sunit_state_chn_owned_su_total=10, 
brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
+                        dimensions=[{broker=localhost, 
metric=bundleUnloading}], metrics=[{brk_lb_ignored_ack_total=3, 
brk_lb_ignored_send_total=2}]
                         """.split("\n"));
-        var actual = primaryLoadManager.getMetrics().stream().map(m -> 
m.toString()).collect(Collectors.toSet());
+        var actual = 
primaryLoadManager.getMetrics().stream().map(Metrics::toString).collect(Collectors.toSet());
         assertEquals(actual, expected);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 6a2ae1cc562..ac7edb3456d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -26,7 +26,6 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -49,7 +48,7 @@ public class UnloadManagerTest {
     @Test
     public void testEventPubFutureHasException() {
         UnloadCounter counter = new UnloadCounter();
-        UnloadManager manager = new UnloadManager(counter);
+        UnloadManager manager = new UnloadManager(counter, 
"mockLookupServiceAddress");
         var unloadDecision =
                 new UnloadDecision(new Unload("broker-1", "bundle-1"), 
Success, Admin);
         CompletableFuture<Void> future =
@@ -69,7 +68,7 @@ public class UnloadManagerTest {
     @Test
     public void testTimeout() throws IllegalAccessException {
         UnloadCounter counter = new UnloadCounter();
-        UnloadManager manager = new UnloadManager(counter);
+        UnloadManager manager = new UnloadManager(counter, 
"mockLookupServiceAddress");
         var unloadDecision =
                 new UnloadDecision(new Unload("broker-1", "bundle-1"), 
Success, Admin);
         CompletableFuture<Void> future =
@@ -93,7 +92,7 @@ public class UnloadManagerTest {
     @Test
     public void testSuccess() throws IllegalAccessException, 
ExecutionException, InterruptedException {
         UnloadCounter counter = new UnloadCounter();
-        UnloadManager manager = new UnloadManager(counter);
+        UnloadManager manager = new UnloadManager(counter, 
"mockLookupServiceAddress");
         var unloadDecision =
                 new UnloadDecision(new Unload("broker-1", "bundle-1"), 
Success, Admin);
         CompletableFuture<Void> future =
@@ -147,7 +146,7 @@ public class UnloadManagerTest {
     @Test
     public void testFailedStage() throws IllegalAccessException {
         UnloadCounter counter = new UnloadCounter();
-        UnloadManager manager = new UnloadManager(counter);
+        UnloadManager manager = new UnloadManager(counter, 
"mockLookupServiceAddress");
         var unloadDecision =
                 new UnloadDecision(new Unload("broker-1", "bundle-1"), 
Success, Admin);
         CompletableFuture<Void> future =
@@ -176,7 +175,7 @@ public class UnloadManagerTest {
     @Test
     public void testClose() throws IllegalAccessException {
         UnloadCounter counter = new UnloadCounter();
-        UnloadManager manager = new UnloadManager(counter);
+        UnloadManager manager = new UnloadManager(counter, 
"mockLookupServiceAddress");
         var unloadDecision =
                 new UnloadDecision(new Unload("broker-1", "bundle-1"), 
Success, Admin);
         CompletableFuture<Void> future =
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index abd00d374f3..476cf3f9b4a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -66,6 +66,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -773,9 +775,19 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         mockZooKeeper.create(mockedBroker, new byte[]{0}, 
Collections.emptyList(), CreateMode.EPHEMERAL);
 
         pulsar.getBrokerService().updateRates();
-        Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
+        Awaitility.await().until(() -> 
!pulsar.getBrokerService().getBundleStats().isEmpty());
         ModularLoadManagerWrapper loadManager = 
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
         loadManager.getLoadManager().updateLocalBrokerData();
+        // Force registration of UnloadManager load balance stats
+        for (var latencyMetric : UnloadManager.LatencyMetric.values()) {
+            var serviceUnit = "serviceUnit";
+            var brokerLookupAddress = "lookupAddress";
+            var serviceUnitStateData = 
Mockito.mock(ServiceUnitStateData.class);
+            
Mockito.when(serviceUnitStateData.sourceBroker()).thenReturn(brokerLookupAddress);
+            
Mockito.when(serviceUnitStateData.dstBroker()).thenReturn(brokerLookupAddress);
+            latencyMetric.beginMeasurement(serviceUnit, brokerLookupAddress, 
serviceUnitStateData);
+            latencyMetric.endMeasurement(serviceUnit);
+        }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, false, 
statsOut);
@@ -797,6 +809,11 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
 
+        assertTrue(metrics.containsKey("brk_lb_unload_latency_ms_bucket"));
+        assertTrue(metrics.containsKey("brk_lb_release_latency_ms_bucket"));
+        assertTrue(metrics.containsKey("brk_lb_assign_latency_ms_bucket"));
+        assertTrue(metrics.containsKey("brk_lb_disconnect_latency_ms_bucket"));
+
         // cleanup.
         mockZooKeeper.delete(mockedBroker, 0);
     }

Reply via email to