dragosvictor commented on code in PR #21854:
URL: https://github.com/apache/pulsar/pull/21854#discussion_r1442231670
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java:
##########
@@ -38,10 +41,81 @@ public class UnloadManager implements StateChangeListener {
private final UnloadCounter counter;
private final Map<String, CompletableFuture<Void>> inFlightUnloadRequest;
+ private final String lookupServiceAddress;
- public UnloadManager(UnloadCounter counter) {
+ private static final Summary unloadLatency =
+ Summary.build("brk_lb_unload_latency", "Total time duration of
unload operations")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
+
+ private static final Summary releaseLatency =
+ Summary.build("brk_lb_release_latency", "Time spent in the load
balancing RELEASE state")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
+ private static final Summary assignLatency =
+ Summary.build("brk_lb_assign_latency", "Time spent in the load
balancing ASSIGN state")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
+ private enum LatencyMetric {
+ UNLOAD(unloadLatency), RELEASE(releaseLatency), ASSIGN(assignLatency);
+
+ private static final long OP_TIMEOUT_NS = TimeUnit.HOURS.toNanos(1);
+
+ private final Summary summary;
+ private final Map<String, CompletableFuture<Void>> futures = new
ConcurrentHashMap<>();
+
+ LatencyMetric(Summary summary) {
+ this.summary = summary;
+ }
+
+ public void beginMeasurement(String serviceUnit) {
+ var startTimeNs = System.nanoTime();
+ futures.computeIfAbsent(serviceUnit, ignore -> {
+ var future = new CompletableFuture<Void>();
+ future.completeOnTimeout(null, OP_TIMEOUT_NS,
TimeUnit.NANOSECONDS).
+ thenAccept(__ -> {
+ var durationNs = System.nanoTime() - startTimeNs;
+ log.info("Operation {} for service unit {} took {}
ns", LatencyMetric.this, serviceUnit,
+ durationNs);
+ summary.observe(durationNs, TimeUnit.NANOSECONDS);
+ }).whenComplete((__, throwable) ->
futures.remove(serviceUnit, future));
+ return future;
+ });
+ }
+
+ public void endMeasurement(String serviceUnit) {
+ var future = futures.get(serviceUnit);
Review Comment:
I am relying on the `whenComplete` stage to remove the entry from the map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]