dragosvictor commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496322969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -205,16 +236,21 @@ public CompletableFuture<Optional<LookupResult>> 
getBrokerServiceUrlAsync(TopicN
                 });
 
         future.thenAccept(optResult -> {
-            lookupLatency.observe(System.nanoTime() - startTime, 
TimeUnit.NANOSECONDS);
+            var latencyNs = System.nanoTime() - startTime;
+            lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+            
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, 
TimeUnit.NANOSECONDS));

Review Comment:
   The problem is that they'd end up being 0 most of the time. If we're to 
standardize on seconds being the unit of time (as OpenTelemetry recommends, and 
I think we should), these numbers must be doubles, but `TimeUnit.toSeconds` 
'speaks' `long` only, and we'd lose virtually all precision when the numbers 
are small enough. 
   
   I'll add a comment to the utility class to explain this decision.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config,
         TransactionBatchedWriteValidator.validate(config);
         this.config = config;
 
+        this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

Review Comment:
   I would, but it's being replaced in the tests: 
https://github.com/apache/pulsar/pull/22058/files#diff-f6308e8022fc0713d66fa883d166e7b9189865b3c8ff9b79aaccaf97d69f2f25R56-R57.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.defaultServerBootstrap = defaultServerBootstrap();
 
         this.pendingLookupRequests = 
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentLookupRequest()
-                        - lookupRequestSemaphore.get().availablePermits())
+                .supplier(this::getPendingLookupRequest)
                 .register();
+        this.pendingLookupRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+                .ofLongs()
+                .setDescription("The number of pending lookup requests in the 
broker. "
+                        + "When it reaches threshold 
\"maxConcurrentLookupRequest\" defined in broker.conf, "
+                        + "new requests are rejected.")
+                .buildWithCallback(measurement -> 
measurement.record(getPendingLookupRequest()));
+        this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+                .ofLongs()
+                .setDescription("The maximum number of pending lookup requests 
in the broker. "
+                        + "Equal to \"maxConcurrentLookupRequest\" defined in 
broker.conf.")
+                .buildWithCallback(
+                        measurement -> 
measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));
 
         this.pendingTopicLoadRequests = ObserverGauge.build(
-                "pulsar_broker_topic_load_pending_requests", "-")
-                .supplier(() -> 
pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
-                        - topicLoadRequestSemaphore.get().availablePermits())
+                        "pulsar_broker_topic_load_pending_requests", "-")
+                .supplier(this::getPendingTopicLoadRequests)
                 .register();
+        this.pendingTopicLoadRequestsCounter = 
pulsar.getOpenTelemetry().getMeter()
+                .gaugeBuilder("pulsar.broker.topic.load.pending.request.usage")

Review Comment:
   Name changes sound good!
   
   Note that the same can be said about the lookup request, as it is being used 
in requests other than topic lookup: 
[handlePartitionMetadataRequest](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L579C20-L579C50)
 , 
[handleGetTopicsOfNamespace](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L2391C20-L2391C46)
 and 
[handleCommandWatchTopicList](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L3036C20-L3036C47).
 Perhaps we can rename it to 
`pulsar.broker.topic.lookup.operation.pending.[usage,limit]`?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java:
##########
@@ -44,10 +45,16 @@ public StartableTestPulsarService(SpyConfig spyConfig, 
ServiceConfiguration conf
                                       CompactionServiceFactory 
compactionServiceFactory,
                                       BrokerInterceptor brokerInterceptor,
                                       BookKeeperClientFactory 
bookKeeperClientFactory,
-                                      Function<BrokerService, BrokerService> 
brokerServiceCustomizer) {
+                                      Function<BrokerService, BrokerService> 
brokerServiceCustomizer,
+                                      PulsarBrokerOpenTelemetry openTelemetry) 
{
         super(spyConfig, config, localMetadataStore, 
configurationMetadataStore, compactionServiceFactory,
                 brokerInterceptor, bookKeeperClientFactory);
         this.brokerServiceCustomizer = brokerServiceCustomizer;
+        if (openTelemetry != null) {
+            // Replace existing OpenTelemetry wrapper class.
+            this.openTelemetry.close();

Review Comment:
   The question here is how much do we want to parameterize the `PulsarService` 
constructor to make room for testing purposes. At some point we might want to 
use a builder for it too :)



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java:
##########
@@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception {
         
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
         loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager1));
 
+        var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();

Review Comment:
   If we want to test these metrics within the existing unit tests, we have 
limited options for where to insert our validation logic. Point 1 is an 
unfortunate consequence. The tests are highly coupled with the implementation, 
agreed, but one way or another we need to control the code paths of the test, 
whether via mocks or intrusive reflection.
   
   Regarding the suggestion: that's loosely what I initially did, mocking 
`getBrokerServiceUrlAsync`, instead of the semaphore itself. The problem with 
that is that is's actually more coupled with the implementation. Since we're 
validating the semaphore related metrics, we should focus on narrowing the 
scope of the mock as close as possible to that. Assuming 
`getBrokerServiceUrlAsync` is called within the scope of the semaphore is a 
deeper coupling between the test and implementation.
   
   I like the `CountDownLatch` recommendation, I agree it will improve 
readability here.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java:
##########
@@ -727,11 +733,24 @@ protected void initializePulsarServices(SpyConfig 
spyConfig, Builder builder) {
                     .equals(PulsarCompactionServiceFactory.class.getName())) {
                 compactionServiceFactory = new 
MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
             }
+            PulsarBrokerOpenTelemetry pulsarBrokerOpenTelemetry;
+            if (builder.enableOpenTelemetry) {
+                var reader = InMemoryMetricReader.create();
+                pulsarBrokerOpenTelemetry = new 
PulsarBrokerOpenTelemetry(builder.config, builderCustomizer -> {
+                    builderCustomizer.addMeterProviderCustomizer(
+                            (meterProviderBuilder, __) -> 
meterProviderBuilder.registerMetricReader(reader));
+                    builderCustomizer.addPropertiesSupplier(
+                            () -> 
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
+                });
+                openTelemetryMetricReader(reader);

Review Comment:
   Consequence of using the `lombok` builder.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java:
##########
@@ -34,10 +37,17 @@ public class PulsarBrokerOpenTelemetry implements Closeable 
{
     private final Meter meter;
 
     public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+        this(config, null);
+    }
+
+    @VisibleForTesting
+    public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
+                                     
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
         openTelemetryService = OpenTelemetryService.builder()
                 .clusterName(config.getClusterName())
                 .serviceName(SERVICE_NAME)
                 .serviceVersion(PulsarVersion.getVersion())
+                .sdkBuilderConsumer(builderCustomizer)

Review Comment:
   The `lombok` builder annotation sets the names of these methods. If I change 
this one, it would be reasonable to change the others, too (`setClusterName`, 
`setServiceName` etc.). Doesn't look like a worthwhile change, and we'd be 
deviating from the conventions used in other builder classes within Pulsar.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);

Review Comment:
   Similar to the previous one, this results from using `lombok` annotations. I 
could add an override without any parameters, but it defeats the purpose of the 
annotation IMO.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
     /**
-     * Verifies: updating zk-throttling node reflects 
broker-maxConcurrentLookupRequest and updates semaphore.
-     *
-     * @throws Exception
+     * Verifies: updating zk-throttling node reflects 
broker-maxConcurrentLookupRequest and updates semaphore, as well
+     * as the related limit metric value.
      */
     @Test
     public void testThrottlingLookupRequestSemaphore() throws Exception {
-        BrokerService service = pulsar.getBrokerService();
-        
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
-        
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", 
Integer.toString(0));
-        Thread.sleep(1000);
-        assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 
0);
+        throtllingRequestSemaphoresTestHelper(

Review Comment:
   Refactored this, thanks!



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -175,6 +187,25 @@ public NamespaceService(PulsarService pulsar) {
         this.bundleSplitListeners = new CopyOnWriteArrayList<>();
         this.localBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
         this.redirectManager = new RedirectManager(pulsar);
+
+        var meter = pulsar.getOpenTelemetry().getMeter();
+        this.lookupRedirectsCounter = meter
+                .counterBuilder("pulsar.broker.lookup.redirect")
+                .setDescription("The number of lookup redirected requests")
+                .build();
+        this.lookupFailuresCounter = meter
+                .counterBuilder("pulsar.broker.lookup.failure")
+                .setDescription("The number of lookup failures")
+                .build();
+        this.lookupAnswersCounter = meter
+                .counterBuilder("pulsar.broker.lookup.answer")
+                .setDescription("The number of lookup responses (i.e. not 
redirected requests)")
+                .build();
+        this.lookupLatencyHistogram = meter
+                .histogramBuilder("pulsar.broker.lookup.latency")

Review Comment:
   Good suggestion. Note that this would split the latency numbers by the 
respective attributes. The users would have to implement some logic on top of 
that to add them up back into what `pulsar_broker_lookup` is currently 
expressing. @merlimat @lhotari thoughts on this?
   
   Regarding the response type attribute names, perhaps `broker` and `redirect` 
are a better fit? Adding the `_url` suffix implies that the actual response URL 
is stored in the value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to