This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 28ed48e8138 [improve][broker] PIP-307: Add monitoring metrics for
graceful closure of producers/consumers (#21854)
28ed48e8138 is described below
commit 28ed48e813843c68e369b1d618de1e7455b191de
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jan 9 16:42:43 2024 -0800
[improve][broker] PIP-307: Add monitoring metrics for graceful closure of
producers/consumers (#21854)
---
.../extensions/ExtensibleLoadManagerImpl.java | 16 +++-
.../channel/ServiceUnitStateChannelImpl.java | 1 -
.../extensions/channel/StateChangeListeners.java | 16 +++-
.../extensions/manager/StateChangeListener.java | 10 +-
.../extensions/manager/UnloadManager.java | 105 ++++++++++++++++++---
.../extensions/models/UnloadCounter.java | 1 -
.../apache/pulsar/broker/service/ServerCnx.java | 28 ++++--
.../broker/stats/prometheus/metrics/Summary.java | 2 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 9 +-
.../extensions/manager/UnloadManagerTest.java | 11 +--
.../pulsar/broker/stats/PrometheusMetricsTest.java | 19 +++-
11 files changed, 181 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 581183cf95a..4664a840c92 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -181,7 +181,9 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
// Record the ignored send msg count during unloading
@Getter
- private final AtomicLong ignoredSendMsgCounter = new AtomicLong();
+ private final AtomicLong ignoredSendMsgCount = new AtomicLong();
+ @Getter
+ private final AtomicLong ignoredAckCount = new AtomicLong();
// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new
AtomicReference<>();
@@ -361,7 +363,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
this.serviceUnitStateChannel = new
ServiceUnitStateChannelImpl(pulsar);
this.brokerRegistry.start();
this.splitManager = new SplitManager(splitCounter);
- this.unloadManager = new UnloadManager(unloadCounter);
+ this.unloadManager = new UnloadManager(unloadCounter,
pulsar.getLookupServiceAddress());
this.serviceUnitStateChannel.listen(unloadManager);
this.serviceUnitStateChannel.listen(splitManager);
this.leaderElectionService.start();
@@ -874,12 +876,20 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager {
}
metricsCollection.addAll(this.assignCounter.toMetrics(pulsar.getAdvertisedAddress()));
-
metricsCollection.addAll(this.serviceUnitStateChannel.getMetrics());
+
metricsCollection.addAll(getIgnoredCommandMetrics(pulsar.getAdvertisedAddress()));
return metricsCollection;
}
+ private List<Metrics> getIgnoredCommandMetrics(String
advertisedBrokerAddress) {
+ var dimensions = Map.of("broker", advertisedBrokerAddress, "metric",
"bundleUnloading");
+ var metric = Metrics.create(dimensions);
+ metric.put("brk_lb_ignored_ack_total", ignoredAckCount.get());
+ metric.put("brk_lb_ignored_send_total", ignoredSendMsgCount.get());
+ return List.of(metric);
+ }
+
private void monitor() {
try {
initWaiter.await();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index bd571284346..118667d2575 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -160,7 +160,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
Split,
Unload,
Override
-
}
@Getter
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
index 1d396f500b6..f0d99e931bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/StateChangeListeners.java
@@ -51,7 +51,21 @@ public class StateChangeListeners {
public <T> CompletableFuture<T> notifyOnCompletion(CompletableFuture<T>
future,
String serviceUnit,
ServiceUnitStateData
data) {
- return future.whenComplete((r, ex) -> notify(serviceUnit, data, ex));
+ return notifyOnArrival(serviceUnit, data).
+ thenCombine(future, (unused, t) -> t).
+ whenComplete((r, ex) -> notify(serviceUnit, data, ex));
+ }
+
+ private CompletableFuture<Void> notifyOnArrival(String serviceUnit,
ServiceUnitStateData data) {
+ stateChangeListeners.forEach(listener -> {
+ try {
+ listener.beforeEvent(serviceUnit, data);
+ } catch (Throwable ex) {
+ log.error("StateChangeListener: {} exception while notifying
arrival event {} for service unit {}",
+ listener, data, serviceUnit, ex);
+ }
+ });
+ return CompletableFuture.completedFuture(null);
}
public void notify(String serviceUnit, ServiceUnitStateData data,
Throwable t) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
index 7ba8be8771b..0d26859f82e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/StateChangeListener.java
@@ -23,7 +23,15 @@ import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateD
public interface StateChangeListener {
/**
- * Handle the service unit state change.
+ * Called before the state change is handled.
+ *
+ * @param serviceUnit - Service Unit(Namespace bundle).
+ * @param data - Service unit state data.
+ */
+ default void beforeEvent(String serviceUnit, ServiceUnitStateData data) { }
+
+ /**
+ * Called after the service unit state change has been handled.
*
* @param serviceUnit - Service Unit(Namespace bundle).
* @param data - Service unit state data.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index ffdbbc2af42..a613d48575d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -20,7 +20,10 @@ package
org.apache.pulsar.broker.loadbalance.extensions.manager;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
+import com.google.common.annotations.VisibleForTesting;
+import io.prometheus.client.Histogram;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -38,13 +41,77 @@ public class UnloadManager implements StateChangeListener {
private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+ private final String lookupServiceAddress;
- public UnloadManager(UnloadCounter counter) {
+ @VisibleForTesting
+ public enum LatencyMetric {
+ UNLOAD(buildHistogram(
+ "brk_lb_unload_latency", "Total time duration of unload operations
on source brokers"), true, false),
+ ASSIGN(buildHistogram(
+ "brk_lb_assign_latency", "Time spent in the load balancing ASSIGN
state on destination brokers"),
+ false, true),
+ RELEASE(buildHistogram(
+ "brk_lb_release_latency", "Time spent in the load balancing
RELEASE state on source brokers"), true, false),
+ DISCONNECT(buildHistogram(
+ "brk_lb_disconnect_latency", "Time spent in the load balancing
disconnected state on source brokers"),
+ true, false);
+
+ private static Histogram buildHistogram(String name, String help) {
+ return Histogram.build(name, help).unit("ms").labelNames("broker",
"metric").
+ buckets(new double[] {1.0, 10.0, 100.0, 200.0,
1000.0}).register();
+ }
+ private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
+
+ private final Histogram histogram;
+ private final Map<String, CompletableFuture<Void>> futures = new
ConcurrentHashMap<>();
+ private final boolean isSourceBrokerMetric;
+ private final boolean isDestinationBrokerMetric;
+
+ LatencyMetric(Histogram histogram, boolean isSourceBrokerMetric,
boolean isDestinationBrokerMetric) {
+ this.histogram = histogram;
+ this.isSourceBrokerMetric = isSourceBrokerMetric;
+ this.isDestinationBrokerMetric = isDestinationBrokerMetric;
+ }
+
+ public void beginMeasurement(String serviceUnit, String
lookupServiceAddress, ServiceUnitStateData data) {
+ if ((isSourceBrokerMetric &&
lookupServiceAddress.equals(data.sourceBroker()))
+ || (isDestinationBrokerMetric &&
lookupServiceAddress.equals(data.dstBroker()))) {
+ var startTimeNs = System.nanoTime();
+ futures.computeIfAbsent(serviceUnit, ignore -> {
+ var future = new CompletableFuture<Void>();
+ future.completeOnTimeout(null, OP_TIMEOUT_NS,
TimeUnit.NANOSECONDS).
+ thenAccept(__ -> {
+ var durationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs);
+ log.info("Operation {} for service unit {}
took {} ms", this, serviceUnit, durationMs);
+ histogram.labels(lookupServiceAddress,
"bundleUnloading").observe(durationMs);
+ }).whenComplete((__, throwable) ->
futures.remove(serviceUnit, future));
+ return future;
+ });
+ }
+ }
+
+ public void endMeasurement(String serviceUnit) {
+ var future = futures.get(serviceUnit);
+ if (future != null) {
+ future.complete(null);
+ }
+ }
+ }
+
+ public UnloadManager(UnloadCounter counter, String lookupServiceAddress) {
this.counter = counter;
- this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+ this.lookupServiceAddress =
Objects.requireNonNull(lookupServiceAddress);
+ inFlightUnloadRequest = new ConcurrentHashMap<>();
}
private void complete(String serviceUnit, Throwable ex) {
+ LatencyMetric.UNLOAD.endMeasurement(serviceUnit);
+ LatencyMetric.DISCONNECT.endMeasurement(serviceUnit);
+ if (ex != null) {
+ LatencyMetric.RELEASE.endMeasurement(serviceUnit);
+ LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
+ }
+
inFlightUnloadRequest.computeIfPresent(serviceUnit, (__, future) -> {
if (!future.isDone()) {
if (ex != null) {
@@ -62,7 +129,6 @@ public class UnloadManager implements StateChangeListener {
UnloadDecision decision,
long timeout,
TimeUnit timeoutUnit) {
-
return eventPubFuture.thenCompose(__ ->
inFlightUnloadRequest.computeIfAbsent(bundle, ignore -> {
if (log.isDebugEnabled()) {
log.debug("Handle unload bundle: {}, timeout: {} {}", bundle,
timeout, timeoutUnit);
@@ -86,23 +152,40 @@ public class UnloadManager implements StateChangeListener {
});
}
+ @Override
+ public void beforeEvent(String serviceUnit, ServiceUnitStateData data) {
+ if (log.isDebugEnabled()) {
+ log.debug("Handling arrival of {} for service unit {}", data,
serviceUnit);
+ }
+ ServiceUnitState state = ServiceUnitStateData.state(data);
+ switch (state) {
+ case Free, Owned ->
LatencyMetric.DISCONNECT.beginMeasurement(serviceUnit, lookupServiceAddress,
data);
+ case Releasing -> {
+ LatencyMetric.RELEASE.beginMeasurement(serviceUnit,
lookupServiceAddress, data);
+ LatencyMetric.UNLOAD.beginMeasurement(serviceUnit,
lookupServiceAddress, data);
+ }
+ case Assigning ->
LatencyMetric.ASSIGN.beginMeasurement(serviceUnit, lookupServiceAddress, data);
+ }
+ }
+
@Override
public void handleEvent(String serviceUnit, ServiceUnitStateData data,
Throwable t) {
- if (t != null && inFlightUnloadRequest.containsKey(serviceUnit)) {
+ if (t != null) {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.",
data, serviceUnit, t);
}
- this.complete(serviceUnit, t);
+ complete(serviceUnit, t);
return;
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {}", data, serviceUnit);
+ }
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
- case Free, Owned -> this.complete(serviceUnit, t);
- default -> {
- if (log.isDebugEnabled()) {
- log.debug("Handling {} for service unit {}", data,
serviceUnit);
- }
- }
+ case Free, Owned -> complete(serviceUnit, t);
+ case Releasing ->
LatencyMetric.RELEASE.endMeasurement(serviceUnit);
+ case Assigning -> LatencyMetric.ASSIGN.endMeasurement(serviceUnit);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
index 4a5d41f7576..72be586e465 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/UnloadCounter.java
@@ -108,7 +108,6 @@ public class UnloadCounter {
}
public List<Metrics> toMetrics(String advertisedBrokerAddress) {
-
var metrics = new ArrayList<Metrics>();
var dimensions = new HashMap<String, String>();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4e3eb9fd4ad..bd4917da3b1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1788,15 +1788,18 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
printSendCommandDebug(send, headersAndPayload);
}
- PulsarService pulsar = getBrokerService().pulsar();
- // if the topic is transferring, we ignore send msg.
+ // New messages are silently ignored during topic transfer. Note that
the transferring flag is only set when the
+ // Extensible Load Manager is enabled.
if (producer.getTopic().isTransferring()) {
- long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
-
.getIgnoredSendMsgCounter().addAndGet(send.getNumMessages());
+ var pulsar = getBrokerService().pulsar();
+ var ignoredMsgCount = send.getNumMessages();
+ var ignoredSendMsgTotalCount =
ExtensibleLoadManagerImpl.get(pulsar).getIgnoredSendMsgCount().
+ addAndGet(ignoredMsgCount);
if (log.isDebugEnabled()) {
- log.debug("Ignored send msg from:{}:{} to fenced topic:{}
while transferring."
- + " Ignored message count:{}.",
- remoteAddress, send.getProducerId(),
producer.getTopic().getName(), ignoredMsgCount);
+ log.debug("Ignoring {} messages from:{}:{} to fenced topic:{}
while transferring."
+ + " Total ignored message count: {}.",
+ ignoredMsgCount, remoteAddress, send.getProducerId(),
producer.getTopic().getName(),
+ ignoredSendMsgTotalCount);
}
return;
}
@@ -1869,11 +1872,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (consumerFuture != null && consumerFuture.isDone() &&
!consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
Subscription subscription = consumer.getSubscription();
+ // Message acks are silently ignored during topic transfer. Note
that the transferring flag is only set when
+ // the Extensible Load Manager is enabled.
if (subscription.getTopic().isTransferring()) {
- // Message acks are silently ignored during topic transfer.
+ var pulsar = getBrokerService().getPulsar();
+ var ignoredAckCount = ack.getMessageIdsCount();
+ var ignoredAckTotalCount =
ExtensibleLoadManagerImpl.get(pulsar).getIgnoredAckCount().
+ addAndGet(ignoredAckCount);
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Ignoring message acknowledgment
during topic transfer, ack count: {}",
- subscription, consumerId,
ack.getMessageIdsCount());
+ log.debug("[{}] [{}] Ignoring {} message acks during topic
transfer. Total ignored ack count: {}",
+ subscription, consumerId, ignoredAckCount,
ignoredAckTotalCount);
}
return;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
index ba640761242..cb4a33a7294 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/Summary.java
@@ -45,7 +45,7 @@ public class Summary extends SimpleCollector<Summary.Child>
implements Collector
}
}
- static class Child {
+ public static class Child {
private final DataSketchesSummaryLogger logger;
private final List<Double> quantiles;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index bb7416ddc41..11d1ef900da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1241,6 +1241,12 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
FieldUtils.writeDeclaredField(channel1, "handlerCounters",
handlerCounters, true);
}
+ primaryLoadManager.getIgnoredSendMsgCount().incrementAndGet();
+ primaryLoadManager.getIgnoredSendMsgCount().incrementAndGet();
+ primaryLoadManager.getIgnoredAckCount().incrementAndGet();
+ primaryLoadManager.getIgnoredAckCount().incrementAndGet();
+ primaryLoadManager.getIgnoredAckCount().incrementAndGet();
+
var expected = Set.of(
"""
dimensions=[{broker=localhost, metric=loadBalancing}],
metrics=[{brk_lb_bandwidth_in_usage=3.0, brk_lb_bandwidth_out_usage=4.0,
brk_lb_cpu_usage=1.0, brk_lb_directMemory_usage=2.0, brk_lb_memory_usage=400.0}]
@@ -1311,8 +1317,9 @@ public class ExtensibleLoadManagerImplTest extends
MockedPulsarServiceBaseTest {
dimensions=[{broker=localhost, metric=sunitStateChn,
result=Schedule}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=5}]
dimensions=[{broker=localhost, metric=sunitStateChn,
result=Success}],
metrics=[{brk_sunit_state_chn_inactive_broker_cleanup_ops_total=1}]
dimensions=[{broker=localhost, metric=sunitStateChn}],
metrics=[{brk_sunit_state_chn_orphan_su_cleanup_ops_total=3,
brk_sunit_state_chn_owned_su_total=10,
brk_sunit_state_chn_su_tombstone_cleanup_ops_total=2}]
+ dimensions=[{broker=localhost,
metric=bundleUnloading}], metrics=[{brk_lb_ignored_ack_total=3,
brk_lb_ignored_send_total=2}]
""".split("\n"));
- var actual = primaryLoadManager.getMetrics().stream().map(m ->
m.toString()).collect(Collectors.toSet());
+ var actual =
primaryLoadManager.getMetrics().stream().map(Metrics::toString).collect(Collectors.toSet());
assertEquals(actual, expected);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 6a2ae1cc562..ac7edb3456d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -26,7 +26,6 @@ import static
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -49,7 +48,7 @@ public class UnloadManagerTest {
@Test
public void testEventPubFutureHasException() {
UnloadCounter counter = new UnloadCounter();
- UnloadManager manager = new UnloadManager(counter);
+ UnloadManager manager = new UnloadManager(counter,
"mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"),
Success, Admin);
CompletableFuture<Void> future =
@@ -69,7 +68,7 @@ public class UnloadManagerTest {
@Test
public void testTimeout() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
- UnloadManager manager = new UnloadManager(counter);
+ UnloadManager manager = new UnloadManager(counter,
"mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"),
Success, Admin);
CompletableFuture<Void> future =
@@ -93,7 +92,7 @@ public class UnloadManagerTest {
@Test
public void testSuccess() throws IllegalAccessException,
ExecutionException, InterruptedException {
UnloadCounter counter = new UnloadCounter();
- UnloadManager manager = new UnloadManager(counter);
+ UnloadManager manager = new UnloadManager(counter,
"mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"),
Success, Admin);
CompletableFuture<Void> future =
@@ -147,7 +146,7 @@ public class UnloadManagerTest {
@Test
public void testFailedStage() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
- UnloadManager manager = new UnloadManager(counter);
+ UnloadManager manager = new UnloadManager(counter,
"mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"),
Success, Admin);
CompletableFuture<Void> future =
@@ -176,7 +175,7 @@ public class UnloadManagerTest {
@Test
public void testClose() throws IllegalAccessException {
UnloadCounter counter = new UnloadCounter();
- UnloadManager manager = new UnloadManager(counter);
+ UnloadManager manager = new UnloadManager(counter,
"mockLookupServiceAddress");
var unloadDecision =
new UnloadDecision(new Unload("broker-1", "bundle-1"),
Success, Admin);
CompletableFuture<Void> future =
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index abd00d374f3..476cf3f9b4a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -66,6 +66,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
+import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerTestBase;
@@ -773,9 +775,19 @@ public class PrometheusMetricsTest extends BrokerTestBase {
mockZooKeeper.create(mockedBroker, new byte[]{0},
Collections.emptyList(), CreateMode.EPHEMERAL);
pulsar.getBrokerService().updateRates();
- Awaitility.await().untilAsserted(() ->
assertTrue(pulsar.getBrokerService().getBundleStats().size() > 0));
+ Awaitility.await().until(() ->
!pulsar.getBrokerService().getBundleStats().isEmpty());
ModularLoadManagerWrapper loadManager =
(ModularLoadManagerWrapper)pulsar.getLoadManager().get();
loadManager.getLoadManager().updateLocalBrokerData();
+ // Force registration of UnloadManager load balance stats
+ for (var latencyMetric : UnloadManager.LatencyMetric.values()) {
+ var serviceUnit = "serviceUnit";
+ var brokerLookupAddress = "lookupAddress";
+ var serviceUnitStateData =
Mockito.mock(ServiceUnitStateData.class);
+
Mockito.when(serviceUnitStateData.sourceBroker()).thenReturn(brokerLookupAddress);
+
Mockito.when(serviceUnitStateData.dstBroker()).thenReturn(brokerLookupAddress);
+ latencyMetric.beginMeasurement(serviceUnit, brokerLookupAddress,
serviceUnitStateData);
+ latencyMetric.endMeasurement(serviceUnit);
+ }
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false,
statsOut);
@@ -797,6 +809,11 @@ public class PrometheusMetricsTest extends BrokerTestBase {
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+ assertTrue(metrics.containsKey("brk_lb_unload_latency_ms_bucket"));
+ assertTrue(metrics.containsKey("brk_lb_release_latency_ms_bucket"));
+ assertTrue(metrics.containsKey("brk_lb_assign_latency_ms_bucket"));
+ assertTrue(metrics.containsKey("brk_lb_disconnect_latency_ms_bucket"));
+
// cleanup.
mockZooKeeper.delete(mockedBroker, 0);
}