asafm commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1493809702


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -205,16 +236,21 @@ public CompletableFuture<Optional<LookupResult>> 
getBrokerServiceUrlAsync(TopicN
                 });
 
         future.thenAccept(optResult -> {
-            lookupLatency.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+            var latencyNs = System.nanoTime() - startTime;
+            lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+            
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, 
TimeUnit.NANOSECONDS));

Review Comment:
   Instead of this, how about:
   `lookupLatencyHistogram.record(NANOSECONDS.toSeconds(latencyNs))`
   
   This way we don't need `MetricUtil` since this already exists inside the 
`TimeUnit` enum.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config,
         TransactionBatchedWriteValidator.validate(config);
         this.config = config;
 
+        this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

Review Comment:
   If we are initializing it in the constructor, can we make it `final`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -175,6 +187,25 @@ public NamespaceService(PulsarService pulsar) {
         this.bundleSplitListeners = new CopyOnWriteArrayList<>();
         this.localBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
         this.redirectManager = new RedirectManager(pulsar);
+
+        var meter = pulsar.getOpenTelemetry().getMeter();
+        this.lookupRedirectsCounter = meter
+                .counterBuilder("pulsar.broker.lookup.redirect")
+                .setDescription("The number of lookup redirected requests")
+                .build();
+        this.lookupFailuresCounter = meter
+                .counterBuilder("pulsar.broker.lookup.failure")
+                .setDescription("The number of lookup failures")
+                .build();
+        this.lookupAnswersCounter = meter
+                .counterBuilder("pulsar.broker.lookup.answer")
+                .setDescription("The number of lookup responses (i.e. not 
redirected requests)")
+                .build();
+        this.lookupLatencyHistogram = meter
+                .histogramBuilder("pulsar.broker.lookup.latency")

Review Comment:
   I think it makes more sense to have a single instrument that will have all 
this data:
   
   Name = `pulsar.broker.lookup.request.duration`
   Attributes:
   * `pulsar.lookup.response.type`: 
      * `broker_url`
      * `redirect_url`
   * `pulsar.response.status`:
      * `success`
      * `failure` 
   
   Why?
   * Histogram already contains `count`, so we don't need a separate counter. 
We're preventing data duplication since the histogram will emit a count anyway.
   * It makes more sense to have the different traits of the response as 
attributes rather than instrument naming since it enables an over-arching look 
at lookup as a whole, and then, if one wishes, they can slice and dice 
accordingly, very quickly. When splitting the traits into separate instruments, 
it is hard to know the total lookup request count.
   * I prefix the attributes since when I think about the user, I remember how 
hard it is to see attributes coming from Kubernetes, the host, and the 
application itself (Pulsar). Here, we make it very clear that one attribute is 
specific to lookup, and the other (`response.status`) is relevant to any 
request to pulsar - meaning, there's a good chance we'll reuse it in other 
requests metrics.
   * I decided to use `response.type` containing `broker_url` and 
`redirect_url` since, to me, it is much more straightforward than "answer" and 
"redirect".
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.defaultServerBootstrap = defaultServerBootstrap();
 
         this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-                        - lookupRequestSemaphore.get().availablePermits())
+                .supplier(this::getPendingLookupRequest)
                 .register();
+        this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+                .ofLongs()
+                .setDescription("The number of pending lookup requests in the 
broker. "

Review Comment:
   We should also define unit: `{request}`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.defaultServerBootstrap = defaultServerBootstrap();
 
         this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-                        - lookupRequestSemaphore.get().availablePermits())
+                .supplier(this::getPendingLookupRequest)
                 .register();
+        this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+                .ofLongs()
+                .setDescription("The number of pending lookup requests in the 
broker. "
+                        + "When it reaches threshold 
\"maxConcurrentLookupRequest\" defined in broker.conf, "
+                        + "new requests are rejected.")
+                .buildWithCallback(measurement -> 
measurement.record(getPendingLookupRequest()));
+        this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+                .ofLongs()
+                .setDescription("The maximum number of pending lookup requests 
in the broker. "
+                        + "Equal to \"maxConcurrentLookupRequest\" defined in 
broker.conf.")
+                .buildWithCallback(
+                        measurement -> 
measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));
 
         this.pendingTopicLoadRequests = ObserverGauge.build(
-                "pulsar_broker_topic_load_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
-                        - topicLoadRequestSemaphore.get().availablePermits())
+                        "pulsar_broker_topic_load_pending_requests", "-")
+                .supplier(this::getPendingTopicLoadRequests)
                 .register();
+        this.pendingTopicLoadRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.topic.load.pending.request.usage")

Review Comment:
   I suggest the names:
   * `pulsar.broker.topic.load.operation.pending.usage`
   * `pulsar.broker.topic.load.operation.pending.limit`
   
   Why?
   * I think loading a topic is not a request but an operation. Contrast that 
to a lookup request, which is actually a request coming from Admin API.
   * I think `pending` should come after what describes it.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.defaultServerBootstrap = defaultServerBootstrap();
 
         this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-                        - lookupRequestSemaphore.get().availablePermits())
+                .supplier(this::getPendingLookupRequest)
                 .register();
+        this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")

Review Comment:
   I would like to align naming with the name I suggested in the previous 
comment since OTel heavily recommends working properly with hierarchy.
   I suggest: 
   * `pulsar.broker.lookup.request.pending.usage`
   * `pulsar.broker.lookup.request.pending.limit`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java:
##########
@@ -34,10 +37,17 @@ public class PulsarBrokerOpenTelemetry implements Closeable 
{
     private final Meter meter;
 
     public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+        this(config, null);
+    }
+
+    @VisibleForTesting
+    public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
+                                     
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
         openTelemetryService = OpenTelemetryService.builder()
                 .clusterName(config.getClusterName())
                 .serviceName(SERVICE_NAME)
                 .serviceVersion(PulsarVersion.getVersion())
+                .sdkBuilderConsumer(builderCustomizer)

Review Comment:
   I just noticed but `sdkBuilderConsumer` --> `setBuilderCustomizer`



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java:
##########
@@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception {
         
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager1));
 
+        var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();

Review Comment:
   I have 2 big problems with this test block:
   1. It's stuck in the middle of a test method that tests something completely 
different. Suddenly, in the middle of it, we're setting up and testing 
something, ruining the readability of the original test method.
   2. It's unreadable and so tightly coupled with the implementation I can 
barely call it a test. I would use this technique as a last resort.
   
   My question is: Can we make `spy` and the pulsar service customizer in such 
a way that when `NamespaceService.getBrokerServiceUrlAsync` is invoked, it's 
actually will be stuck on something like CountDownLatch, then you grab the 
pending metric - see that it's 1 and then release the latch?
   
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.defaultServerBootstrap = defaultServerBootstrap();
 
         this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-                        - lookupRequestSemaphore.get().availablePermits())
+                .supplier(this::getPendingLookupRequest)
                 .register();
+        this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")

Review Comment:
   According to [OpenTelemetry Specifications for 
Gauge](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-gauge):
   > Asynchronous Gauge is an [asynchronous 
Instrument](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-instrument-api)
 which reports non-additive value(s) (e.g. the room temperature - it makes no 
sense to report the temperature value from multiple rooms and sum them up) when 
the instrument is being observed.
   
   >Note: if the values are additive (e.g. the process heap size - it makes 
sense to report the heap size from multiple processes and sum them up, so we 
get the total heap usage), use [Asynchronous 
Counter](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-counter)
 or [Asynchronous 
UpDownCounter](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-updowncounter).
   
   Hence this should be an UpDownCounter



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.defaultServerBootstrap = defaultServerBootstrap();
 
         this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-                        - lookupRequestSemaphore.get().availablePermits())
+                .supplier(this::getPendingLookupRequest)
                 .register();
+        this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+                .ofLongs()
+                .setDescription("The number of pending lookup requests in the 
broker. "
+                        + "When it reaches threshold 
\"maxConcurrentLookupRequest\" defined in broker.conf, "
+                        + "new requests are rejected.")
+                .buildWithCallback(measurement -> 
measurement.record(getPendingLookupRequest()));
+        this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+                .ofLongs()
+                .setDescription("The maximum number of pending lookup requests 
in the broker. "

Review Comment:
   Also add unit here



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -241,8 +243,15 @@ public class BrokerService implements Closeable {
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
     protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
 
+    @PulsarDeprecatedMetric(newMetricName = 
"pulsar.broker.lookup.pending.request.usage")
     private final ObserverGauge pendingLookupRequests;
+    private final ObservableLongGauge pendingLookupRequestsCounter;
+    private final ObservableLongGauge pendingLookupRequestsLimit;
+
+    @PulsarDeprecatedMetric(newMetricName = 
"pulsar.broker.topic.load.pending.request.usage")
     private final ObserverGauge pendingTopicLoadRequests;
+    private final ObservableLongGauge pendingTopicLoadRequestsCounter;

Review Comment:
   `pendingTopicLoadRequestsCounter` -> `pendingTopicLoadRequestsGauge`
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -241,8 +243,15 @@ public class BrokerService implements Closeable {
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
     protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
 
+    @PulsarDeprecatedMetric(newMetricName = 
"pulsar.broker.lookup.pending.request.usage")
     private final ObserverGauge pendingLookupRequests;
+    private final ObservableLongGauge pendingLookupRequestsCounter;
+    private final ObservableLongGauge pendingLookupRequestsLimit;
+
+    @PulsarDeprecatedMetric(newMetricName = 
"pulsar.broker.topic.load.pending.request.usage")
     private final ObserverGauge pendingTopicLoadRequests;
+    private final ObservableLongGauge pendingTopicLoadRequestsCounter;
+    private final ObservableLongGauge pendingTopicLoadRequestsLimit;

Review Comment:
   `pendingTopicLoadRequestsLimit` -> `pendingTopicLoadRequestsLimitGauge`



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);

Review Comment:
   Why not `builder.enableOpenTelemetry()`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java:
##########
@@ -727,11 +733,24 @@ protected void initializePulsarServices(SpyConfig 
spyConfig, Builder builder) {
                     .equals(PulsarCompactionServiceFactory.class.getName())) {
                 compactionServiceFactory = new 
MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
             }
+            PulsarBrokerOpenTelemetry pulsarBrokerOpenTelemetry;
+            if (builder.enableOpenTelemetry) {
+                var reader = InMemoryMetricReader.create();
+                pulsarBrokerOpenTelemetry = new 
PulsarBrokerOpenTelemetry(builder.config, builderCustomizer -> {
+                    builderCustomizer.addMeterProviderCustomizer(
+                            (meterProviderBuilder, __) -> 
meterProviderBuilder.registerMetricReader(reader));
+                    builderCustomizer.addPropertiesSupplier(
+                            () -> 
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
+                });
+                openTelemetryMetricReader(reader);

Review Comment:
   I don't get this command.
   `openTelemetryMetricReader` is a variable, so why does this look like a 
function call?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
     /**
-     * Verifies: updating zk-throttling node reflects 
broker-maxConcurrentLookupRequest and updates semaphore.
-     *
-     * @throws Exception
+     * Verifies: updating zk-throttling node reflects 
broker-maxConcurrentLookupRequest and updates semaphore, as well
+     * as the related limit metric value.
      */
     @Test
     public void testThrottlingLookupRequestSemaphore() throws Exception {
-        BrokerService service = pulsar.getBrokerService();
-        
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
-        
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", 
Integer.toString(0));
-        Thread.sleep(1000);
-        assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 
0);
+        throtllingRequestSemaphoresTestHelper(

Review Comment:
   When you read this, you can't understand what is tested here. We call a 
helper that is supposed to help you with testing `throttlingRequestSemaphore`. 
   It's preferred to take the 10 lines of code in that helper method, convert 
them to 3 helper methods like `setSempahore` and `assertMetricHasValue() ` and 
then use those in each test method, so at the end, I will be able to read 
through the test scenario.
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java:
##########
@@ -44,10 +45,16 @@ public StartableTestPulsarService(SpyConfig spyConfig, 
ServiceConfiguration conf
                                       CompactionServiceFactory 
compactionServiceFactory,
                                       BrokerInterceptor brokerInterceptor,
                                       BookKeeperClientFactory 
bookKeeperClientFactory,
-                                      Function<BrokerService, BrokerService> 
brokerServiceCustomizer) {
+                                      Function<BrokerService, BrokerService> 
brokerServiceCustomizer,
+                                      PulsarBrokerOpenTelemetry openTelemetry) 
{
         super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactionServiceFactory,
                 brokerInterceptor, bookKeeperClientFactory);
         this.brokerServiceCustomizer = brokerServiceCustomizer;
+        if (openTelemetry != null) {
+            // Replace existing OpenTelemetry wrapper class.
+            this.openTelemetry.close();

Review Comment:
   This looked different than other variables at PulsarService when I examined 
this class. Why ain't it instantiated the way we want it to be for the test?
   @lhotari Can you take a closer look here?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to