dragosvictor commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1496322969
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -205,16 +236,21 @@ public CompletableFuture<Optional<LookupResult>>
getBrokerServiceUrlAsync(TopicN
});
future.thenAccept(optResult -> {
- lookupLatency.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ var latencyNs = System.nanoTime() - startTime;
+ lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs,
TimeUnit.NANOSECONDS));
Review Comment:
The problem is that they'd end up being 0 most of the time. If we're to
standardize on seconds being the unit of time (as OpenTelemetry recommends, and
I think we should), these numbers must be doubles, but `TimeUnit.toSeconds`
'speaks' `long` only, and we'd lose virtually all precision when the numbers
are small enough.
I'll add a comment to the utility class to explain this decision.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config,
TransactionBatchedWriteValidator.validate(config);
this.config = config;
+ this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
Review Comment:
I would, but it's being replaced in the tests:
https://github.com/apache/pulsar/pull/22058/files#diff-f6308e8022fc0713d66fa883d166e7b9189865b3c8ff9b79aaccaf97d69f2f25R56-R57.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests =
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+ .ofLongs()
+ .setDescription("The number of pending lookup requests in the
broker. "
+ + "When it reaches threshold
\"maxConcurrentLookupRequest\" defined in broker.conf, "
+ + "new requests are rejected.")
+ .buildWithCallback(measurement ->
measurement.record(getPendingLookupRequest()));
+ this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+ .ofLongs()
+ .setDescription("The maximum number of pending lookup requests
in the broker. "
+ + "Equal to \"maxConcurrentLookupRequest\" defined in
broker.conf.")
+ .buildWithCallback(
+ measurement ->
measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));
this.pendingTopicLoadRequests = ObserverGauge.build(
- "pulsar_broker_topic_load_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- - topicLoadRequestSemaphore.get().availablePermits())
+ "pulsar_broker_topic_load_pending_requests", "-")
+ .supplier(this::getPendingTopicLoadRequests)
.register();
+ this.pendingTopicLoadRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.topic.load.pending.request.usage")
Review Comment:
Name changes sound good!
Note that the same can be said about the lookup request, as it is being used
in requests other than topic lookup:
[handlePartitionMetadataRequest](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L579C20-L579C50)
,
[handleGetTopicsOfNamespace](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L2391C20-L2391C46)
and
[handleCommandWatchTopicList](https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L3036C20-L3036C47).
Perhaps we can rename it to
`pulsar.broker.topic.lookup.operation.pending.[usage,limit]`?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java:
##########
@@ -44,10 +45,16 @@ public StartableTestPulsarService(SpyConfig spyConfig,
ServiceConfiguration conf
CompactionServiceFactory
compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory
bookKeeperClientFactory,
- Function<BrokerService, BrokerService>
brokerServiceCustomizer) {
+ Function<BrokerService, BrokerService>
brokerServiceCustomizer,
+ PulsarBrokerOpenTelemetry openTelemetry)
{
super(spyConfig, config, localMetadataStore,
configurationMetadataStore, compactionServiceFactory,
brokerInterceptor, bookKeeperClientFactory);
this.brokerServiceCustomizer = brokerServiceCustomizer;
+ if (openTelemetry != null) {
+ // Replace existing OpenTelemetry wrapper class.
+ this.openTelemetry.close();
Review Comment:
The question here is how much do we want to parameterize the `PulsarService`
constructor to make room for testing purposes. At some point we might want to
use a builder for it too :)
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java:
##########
@@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception {
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new
AtomicReference<>(loadManager1));
+ var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
Review Comment:
If we want to test these metrics within the existing unit tests, we have
limited options for where to insert our validation logic. Point 1 is an
unfortunate consequence. The tests are highly coupled with the implementation,
agreed, but one way or another we need to control the code paths of the test,
whether via mocks or intrusive reflection.
Regarding the suggestion: that's loosely what I initially did, mocking
`getBrokerServiceUrlAsync`, instead of the semaphore itself. The problem with
that is that is's actually more coupled with the implementation. Since we're
validating the semaphore related metrics, we should focus on narrowing the
scope of the mock as close as possible to that. Assuming
`getBrokerServiceUrlAsync` is called within the scope of the semaphore is a
deeper coupling between the test and implementation.
I like the `CountDownLatch` recommendation, I agree it will improve
readability here.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java:
##########
@@ -727,11 +733,24 @@ protected void initializePulsarServices(SpyConfig
spyConfig, Builder builder) {
.equals(PulsarCompactionServiceFactory.class.getName())) {
compactionServiceFactory = new
MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
}
+ PulsarBrokerOpenTelemetry pulsarBrokerOpenTelemetry;
+ if (builder.enableOpenTelemetry) {
+ var reader = InMemoryMetricReader.create();
+ pulsarBrokerOpenTelemetry = new
PulsarBrokerOpenTelemetry(builder.config, builderCustomizer -> {
+ builderCustomizer.addMeterProviderCustomizer(
+ (meterProviderBuilder, __) ->
meterProviderBuilder.registerMetricReader(reader));
+ builderCustomizer.addPropertiesSupplier(
+ () ->
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
+ });
+ openTelemetryMetricReader(reader);
Review Comment:
Consequence of using the `lombok` builder.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java:
##########
@@ -34,10 +37,17 @@ public class PulsarBrokerOpenTelemetry implements Closeable
{
private final Meter meter;
public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+ this(config, null);
+ }
+
+ @VisibleForTesting
+ public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
+
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
+ .sdkBuilderConsumer(builderCustomizer)
Review Comment:
The `lombok` builder annotation sets the names of these methods. If I change
this one, it would be reasonable to change the others, too (`setClusterName`,
`setServiceName` etc.). Doesn't look like a worthwhile change, and we'd be
deviating from the conventions used in other builder classes within Pulsar.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
Review Comment:
Similar to the previous one, this results from using `lombok` annotations. I
could add an override without any parameters, but it defeats the purpose of the
annotation IMO.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
+ }
+
/**
- * Verifies: updating zk-throttling node reflects
broker-maxConcurrentLookupRequest and updates semaphore.
- *
- * @throws Exception
+ * Verifies: updating zk-throttling node reflects
broker-maxConcurrentLookupRequest and updates semaphore, as well
+ * as the related limit metric value.
*/
@Test
public void testThrottlingLookupRequestSemaphore() throws Exception {
- BrokerService service = pulsar.getBrokerService();
-
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
-
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest",
Integer.toString(0));
- Thread.sleep(1000);
- assertEquals(service.lookupRequestSemaphore.get().availablePermits(),
0);
+ throtllingRequestSemaphoresTestHelper(
Review Comment:
Refactored this, thanks!
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -175,6 +187,25 @@ public NamespaceService(PulsarService pulsar) {
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache =
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
+
+ var meter = pulsar.getOpenTelemetry().getMeter();
+ this.lookupRedirectsCounter = meter
+ .counterBuilder("pulsar.broker.lookup.redirect")
+ .setDescription("The number of lookup redirected requests")
+ .build();
+ this.lookupFailuresCounter = meter
+ .counterBuilder("pulsar.broker.lookup.failure")
+ .setDescription("The number of lookup failures")
+ .build();
+ this.lookupAnswersCounter = meter
+ .counterBuilder("pulsar.broker.lookup.answer")
+ .setDescription("The number of lookup responses (i.e. not
redirected requests)")
+ .build();
+ this.lookupLatencyHistogram = meter
+ .histogramBuilder("pulsar.broker.lookup.latency")
Review Comment:
Good suggestion. Note that this would split the latency numbers by the
respective attributes. The users would have to implement some logic on top of
that to add them up back into what `pulsar_broker_lookup` is currently
expressing. @merlimat @lhotari thoughts on this?
Regarding the response type attribute names, perhaps `broker` and `redirect`
are a better fit? Adding the `_url` suffix implies that the actual response URL
is stored in the value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]