heesung-sn commented on code in PR #21854:
URL: https://github.com/apache/pulsar/pull/21854#discussion_r1442213541
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -874,12 +876,20 @@ public List<Metrics> getMetrics() {
}
metricsCollection.addAll(this.assignCounter.toMetrics(pulsar.getAdvertisedAddress()));
-
metricsCollection.addAll(this.serviceUnitStateChannel.getMetrics());
+ metricsCollection.addAll(toMetrics(pulsar.getAdvertisedAddress()));
return metricsCollection;
}
+ private List<Metrics> toMetrics(String advertisedBrokerAddress) {
+ var dimensions = Map.of("broker", advertisedBrokerAddress, "metric",
"bundleReleasing");
Review Comment:
Should we label "bundleUnloading" for its metric name, similar to other
unload metrics?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -92,17 +153,35 @@ public void handleEvent(String serviceUnit,
ServiceUnitStateData data, Throwable
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {} with exception.",
data, serviceUnit, t);
}
- this.complete(serviceUnit, t);
+ complete(serviceUnit, t);
return;
}
+
+ if (log.isDebugEnabled()) {
+ log.debug("Handling {} for service unit {}", data, serviceUnit);
+ }
ServiceUnitState state = ServiceUnitStateData.state(data);
switch (state) {
- case Free, Owned -> this.complete(serviceUnit, t);
- default -> {
- if (log.isDebugEnabled()) {
- log.debug("Handling {} for service unit {}", data,
serviceUnit);
- }
- }
+ case Free, Owned -> complete(serviceUnit, t);
+ case Releasing -> recordReleaseLatency(serviceUnit, data);
+ case Assigning -> recordAssigningLatency(serviceUnit, data);
+ }
+ }
+
+ private void recordReleaseLatency(String serviceUnit, ServiceUnitStateData
data) {
+ if (lookupServiceAddress.equals(data.sourceBroker())) {
+ releaseLatency.beginMeasurement(serviceUnit);
+ unloadLatency.beginMeasurement(serviceUnit);
Review Comment:
I am wondering which broker should measure these events and emit metrics.
It appears that in this PR, both source and destination brokers measure
them. Should we make only the leader observe these events?
From the graph perspective, if multiple brokers emit these quantile metrics.
Then, I assume users should aggregate these quantiles on their end across all
brokers if they want to plot the global unloading latency quantiles.
Then, I wonder if we can aggregate `Summary` metrics. I remember we might
need to use `Histogram` for these quantile aggregations (However, I couldn't
find any `Histogram` examples in Pulsar).
ref:
https://prometheus.io/docs/practices/histograms/#histograms-and-summaries
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -38,10 +41,63 @@ public class UnloadManager implements StateChangeListener {
private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+ private final String lookupServiceAddress;
+ private final LatencyMetric unloadLatency;
+ private final LatencyMetric assignLatency;
+ private final LatencyMetric releaseLatency;
- public UnloadManager(UnloadCounter counter) {
+
+ private class LatencyMetric {
+
+ private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
+ private static final double QUANTILES[] = {0.0, 0.50, 0.95, 0.99,
0.999, 0.9999, 1.0};
+ private static final String LABEL_NAMES[] = {"broker"};
Review Comment:
Should we label "bundleUnloading" for its metric name, similar to other
unload metrics? or `broker` is required here?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -797,6 +804,10 @@ public void testBundlesMetrics() throws Exception {
assertTrue(metrics.containsKey("pulsar_lb_bundles_split_total"));
+ assertTrue(metrics.containsKey("brk_lb_unload_latency"));
Review Comment:
I assume these latency metrics are automatically wired in the broker
metric(without having to emit them explicitly).
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java:
##########
@@ -874,12 +876,20 @@ public List<Metrics> getMetrics() {
}
metricsCollection.addAll(this.assignCounter.toMetrics(pulsar.getAdvertisedAddress()));
-
metricsCollection.addAll(this.serviceUnitStateChannel.getMetrics());
+ metricsCollection.addAll(toMetrics(pulsar.getAdvertisedAddress()));
return metricsCollection;
}
+ private List<Metrics> toMetrics(String advertisedBrokerAddress) {
Review Comment:
nit: `getIgnoredCommandMetrics`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -38,10 +41,81 @@ public class UnloadManager implements StateChangeListener {
private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+ private final String lookupServiceAddress;
- public UnloadManager(UnloadCounter counter) {
+ private static final Summary unloadLatency =
+ Summary.build("brk_lb_unload_latency", "Total time duration of
unload operations")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
+
+ private static final Summary releaseLatency =
+ Summary.build("brk_lb_release_latency", "Time spent in the load
balancing RELEASE state")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
+ private static final Summary assignLatency =
+ Summary.build("brk_lb_assign_latency", "Time spent in the load
balancing ASSIGN state")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
+ private enum LatencyMetric {
+ UNLOAD(unloadLatency), RELEASE(releaseLatency), ASSIGN(assignLatency);
+
+ private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
+
+ private final Summary summary;
+ private final Map<String, CompletableFuture<Void>> futures = new
ConcurrentHashMap<>();
+
+ LatencyMetric(Summary summary) {
+ this.summary = summary;
+ }
+
+ public void beginMeasurement(String serviceUnit) {
+ var startTimeNs = System.nanoTime();
+ futures.computeIfAbsent(serviceUnit, ignore -> {
+ var future = new CompletableFuture<Void>();
+ future.completeOnTimeout(null, OP_TIMEOUT_NS,
TimeUnit.NANOSECONDS).
+ thenAccept(__ -> {
+ var durationNs = System.nanoTime() - startTimeNs;
+ log.info("Operation {} for service unit {} took {}
ns", LatencyMetric.this, serviceUnit,
+ durationNs);
+ summary.observe(durationNs, TimeUnit.NANOSECONDS);
+ }).whenComplete((__, throwable) ->
futures.remove(serviceUnit, future));
+ return future;
+ });
+ }
+
+ public void endMeasurement(String serviceUnit) {
+ var future = futures.get(serviceUnit);
Review Comment:
should we remove first here `futures.remove(serviceUnit);` ?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -38,10 +41,63 @@ public class UnloadManager implements StateChangeListener {
private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+ private final String lookupServiceAddress;
+ private final LatencyMetric unloadLatency;
+ private final LatencyMetric assignLatency;
+ private final LatencyMetric releaseLatency;
- public UnloadManager(UnloadCounter counter) {
+
+ private class LatencyMetric {
+
+ private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
+ private static final double QUANTILES[] = {0.0, 0.50, 0.95, 0.99,
0.999, 0.9999, 1.0};
+ private static final String LABEL_NAMES[] = {"broker"};
+
+ private final Summary.Child summary;
+ private final Map<String, CompletableFuture<Void>> futures = new
ConcurrentHashMap<>();
+ private final String operation;
+
+ LatencyMetric(String name, String help, String operation) {
+ var builder = Summary.build(name, help).labelNames(LABEL_NAMES);
+ for (var quantile: QUANTILES) {
+ builder.quantile(quantile);
+ }
+ this.summary = builder.register().labels(lookupServiceAddress);
+ this.operation = operation;
+ }
+
+ public void beginMeasurement(String serviceUnit) {
+ var startTimeNs = System.nanoTime();
+ futures.computeIfAbsent(serviceUnit, ignore -> {
+ var future = new CompletableFuture<Void>();
+ future.completeOnTimeout(null, OP_TIMEOUT_NS,
TimeUnit.NANOSECONDS).
+ thenAccept(__ -> {
+ var durationNs = System.nanoTime() - startTimeNs;
+ log.info("Operation {} for service unit {} took {}
ns", operation, serviceUnit, durationNs);
+ summary.observe(durationNs, TimeUnit.NANOSECONDS);
+ }).whenComplete((__, throwable) ->
futures.remove(serviceUnit, future));
+ return future;
+ });
+ }
+
+ public void endMeasurement(String serviceUnit) {
+ var future = futures.get(serviceUnit);
+ if (future != null) {
+ future.complete(null);
+ }
+ }
+ }
+
+ public UnloadManager(PulsarService pulsar, UnloadCounter counter) {
this.counter = counter;
- this.inFlightUnloadRequest = new ConcurrentHashMap<>();
+ inFlightUnloadRequest = new ConcurrentHashMap<>();
+ lookupServiceAddress =
Objects.requireNonNull(pulsar.getLookupServiceAddress());
+ unloadLatency =
+ new LatencyMetric("brk_lb_unload_latency", "Total time duration of
unload operations", "UNLOAD");
Review Comment:
Please add units of these latency metrics in the descriptions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]