asafm commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1493809702
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -205,16 +236,21 @@ public CompletableFuture<Optional<LookupResult>>
getBrokerServiceUrlAsync(TopicN
});
future.thenAccept(optResult -> {
- lookupLatency.observe(System.nanoTime() - startTime,
TimeUnit.NANOSECONDS);
+ var latencyNs = System.nanoTime() - startTime;
+ lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
+
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs,
TimeUnit.NANOSECONDS));
Review Comment:
Instead of this, how about:
`lookupLatencyHistogram.record(NANOSECONDS.toSeconds(latencyNs))`
This way we don't need `MetricUtil` since this already exists inside the
`TimeUnit` enum.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##########
@@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config,
TransactionBatchedWriteValidator.validate(config);
this.config = config;
+ this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
Review Comment:
If we are initializing it in the constructor, can we make it `final`?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -175,6 +187,25 @@ public NamespaceService(PulsarService pulsar) {
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache =
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);
+
+ var meter = pulsar.getOpenTelemetry().getMeter();
+ this.lookupRedirectsCounter = meter
+ .counterBuilder("pulsar.broker.lookup.redirect")
+ .setDescription("The number of lookup redirected requests")
+ .build();
+ this.lookupFailuresCounter = meter
+ .counterBuilder("pulsar.broker.lookup.failure")
+ .setDescription("The number of lookup failures")
+ .build();
+ this.lookupAnswersCounter = meter
+ .counterBuilder("pulsar.broker.lookup.answer")
+ .setDescription("The number of lookup responses (i.e. not
redirected requests)")
+ .build();
+ this.lookupLatencyHistogram = meter
+ .histogramBuilder("pulsar.broker.lookup.latency")
Review Comment:
I think it makes more sense to have a single instrument that will have all
this data:
Name = `pulsar.broker.lookup.request.duration`
Attributes:
* `pulsar.lookup.response.type`:
* `broker_url`
* `redirect_url`
* `pulsar.response.status`:
* `success`
* `failure`
Why?
* Histogram already contains `count`, so we don't need a separate counter.
We're preventing data duplication since the histogram will emit a count anyway.
* It makes more sense to have the different traits of the response as
attributes rather than instrument naming since it enables an over-arching look
at lookup as a whole, and then, if one wishes, they can slice and dice
accordingly, very quickly. When splitting the traits into separate instruments,
it is hard to know the total lookup request count.
* I prefix the attributes since when I think about the user, I remember how
hard it is to see attributes coming from Kubernetes, the host, and the
application itself (Pulsar). Here, we make it very clear that one attribute is
specific to lookup, and the other (`response.status`) is relevant to any
request to pulsar - meaning, there's a good chance we'll reuse it in other
requests metrics.
* I decided to use `response.type` containing `broker_url` and
`redirect_url` since, to me, it is much more straightforward than "answer" and
"redirect".
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests =
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+ .ofLongs()
+ .setDescription("The number of pending lookup requests in the
broker. "
Review Comment:
We should also define unit: `{request}`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests =
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+ .ofLongs()
+ .setDescription("The number of pending lookup requests in the
broker. "
+ + "When it reaches threshold
\"maxConcurrentLookupRequest\" defined in broker.conf, "
+ + "new requests are rejected.")
+ .buildWithCallback(measurement ->
measurement.record(getPendingLookupRequest()));
+ this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+ .ofLongs()
+ .setDescription("The maximum number of pending lookup requests
in the broker. "
+ + "Equal to \"maxConcurrentLookupRequest\" defined in
broker.conf.")
+ .buildWithCallback(
+ measurement ->
measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));
this.pendingTopicLoadRequests = ObserverGauge.build(
- "pulsar_broker_topic_load_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- - topicLoadRequestSemaphore.get().availablePermits())
+ "pulsar_broker_topic_load_pending_requests", "-")
+ .supplier(this::getPendingTopicLoadRequests)
.register();
+ this.pendingTopicLoadRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.topic.load.pending.request.usage")
Review Comment:
I suggest the names:
* `pulsar.broker.topic.load.operation.pending.usage`
* `pulsar.broker.topic.load.operation.pending.limit`
Why?
* I think loading a topic is not a request but an operation. Contrast that
to a lookup request, which is actually a request coming from Admin API.
* I think `pending` should come after what describes it.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests =
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
Review Comment:
I would like to align naming with the name I suggested in the previous
comment since OTel heavily recommends working properly with hierarchy.
I suggest:
* `pulsar.broker.lookup.request.pending.usage`
* `pulsar.broker.lookup.request.pending.limit`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java:
##########
@@ -34,10 +37,17 @@ public class PulsarBrokerOpenTelemetry implements Closeable
{
private final Meter meter;
public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
+ this(config, null);
+ }
+
+ @VisibleForTesting
+ public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
+
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
+ .sdkBuilderConsumer(builderCustomizer)
Review Comment:
I just noticed but `sdkBuilderConsumer` --> `setBuilderCustomizer`
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java:
##########
@@ -178,6 +188,49 @@ public void testMultipleBrokerLookup() throws Exception {
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new
AtomicReference<>(loadManager1));
+ var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
Review Comment:
I have 2 big problems with this test block:
1. It's stuck in the middle of a test method that tests something completely
different. Suddenly, in the middle of it, we're setting up and testing
something, ruining the readability of the original test method.
2. It's unreadable and so tightly coupled with the implementation I can
barely call it a test. I would use this technique as a last resort.
My question is: Can we make `spy` and the pulsar service customizer in such
a way that when `NamespaceService.getBrokerServiceUrlAsync` is invoked, it's
actually will be stuck on something like CountDownLatch, then you grab the
pending metric - see that it's 1 and then release the latch?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests =
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
Review Comment:
According to [OpenTelemetry Specifications for
Gauge](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-gauge):
> Asynchronous Gauge is an [asynchronous
Instrument](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-instrument-api)
which reports non-additive value(s) (e.g. the room temperature - it makes no
sense to report the temperature value from multiple rooms and sum them up) when
the instrument is being observed.
>Note: if the values are additive (e.g. the process heap size - it makes
sense to report the heap size from multiple processes and sum them up, so we
get the total heap usage), use [Asynchronous
Counter](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-counter)
or [Asynchronous
UpDownCounter](https://opentelemetry.io/docs/specs/otel/metrics/api/#asynchronous-updowncounter).
Hence this should be an UpDownCounter
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -403,15 +411,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();
this.pendingLookupRequests =
ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
- .supplier(() ->
pulsar.getConfig().getMaxConcurrentLookupRequest()
- - lookupRequestSemaphore.get().availablePermits())
+ .supplier(this::getPendingLookupRequest)
.register();
+ this.pendingLookupRequestsCounter =
pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.usage")
+ .ofLongs()
+ .setDescription("The number of pending lookup requests in the
broker. "
+ + "When it reaches threshold
\"maxConcurrentLookupRequest\" defined in broker.conf, "
+ + "new requests are rejected.")
+ .buildWithCallback(measurement ->
measurement.record(getPendingLookupRequest()));
+ this.pendingLookupRequestsLimit = pulsar.getOpenTelemetry().getMeter()
+ .gaugeBuilder("pulsar.broker.lookup.pending.request.limit")
+ .ofLongs()
+ .setDescription("The maximum number of pending lookup requests
in the broker. "
Review Comment:
Also add unit here
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -241,8 +243,15 @@ public class BrokerService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
+ @PulsarDeprecatedMetric(newMetricName =
"pulsar.broker.lookup.pending.request.usage")
private final ObserverGauge pendingLookupRequests;
+ private final ObservableLongGauge pendingLookupRequestsCounter;
+ private final ObservableLongGauge pendingLookupRequestsLimit;
+
+ @PulsarDeprecatedMetric(newMetricName =
"pulsar.broker.topic.load.pending.request.usage")
private final ObserverGauge pendingTopicLoadRequests;
+ private final ObservableLongGauge pendingTopicLoadRequestsCounter;
Review Comment:
`pendingTopicLoadRequestsCounter` -> `pendingTopicLoadRequestsGauge`
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -241,8 +243,15 @@ public class BrokerService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
+ @PulsarDeprecatedMetric(newMetricName =
"pulsar.broker.lookup.pending.request.usage")
private final ObserverGauge pendingLookupRequests;
+ private final ObservableLongGauge pendingLookupRequestsCounter;
+ private final ObservableLongGauge pendingLookupRequestsLimit;
+
+ @PulsarDeprecatedMetric(newMetricName =
"pulsar.broker.topic.load.pending.request.usage")
private final ObserverGauge pendingTopicLoadRequests;
+ private final ObservableLongGauge pendingTopicLoadRequestsCounter;
+ private final ObservableLongGauge pendingTopicLoadRequestsLimit;
Review Comment:
`pendingTopicLoadRequestsLimit` -> `pendingTopicLoadRequestsLimitGauge`
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
Review Comment:
Why not `builder.enableOpenTelemetry()`?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java:
##########
@@ -727,11 +733,24 @@ protected void initializePulsarServices(SpyConfig
spyConfig, Builder builder) {
.equals(PulsarCompactionServiceFactory.class.getName())) {
compactionServiceFactory = new
MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
}
+ PulsarBrokerOpenTelemetry pulsarBrokerOpenTelemetry;
+ if (builder.enableOpenTelemetry) {
+ var reader = InMemoryMetricReader.create();
+ pulsarBrokerOpenTelemetry = new
PulsarBrokerOpenTelemetry(builder.config, builderCustomizer -> {
+ builderCustomizer.addMeterProviderCustomizer(
+ (meterProviderBuilder, __) ->
meterProviderBuilder.registerMetricReader(reader));
+ builderCustomizer.addPropertiesSupplier(
+ () ->
Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
+ });
+ openTelemetryMetricReader(reader);
Review Comment:
I don't get this command.
`openTelemetryMetricReader` is a variable, so why does this look like a
function call?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java:
##########
@@ -66,18 +70,54 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
+ }
+
/**
- * Verifies: updating zk-throttling node reflects
broker-maxConcurrentLookupRequest and updates semaphore.
- *
- * @throws Exception
+ * Verifies: updating zk-throttling node reflects
broker-maxConcurrentLookupRequest and updates semaphore, as well
+ * as the related limit metric value.
*/
@Test
public void testThrottlingLookupRequestSemaphore() throws Exception {
- BrokerService service = pulsar.getBrokerService();
-
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
-
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest",
Integer.toString(0));
- Thread.sleep(1000);
- assertEquals(service.lookupRequestSemaphore.get().availablePermits(),
0);
+ throtllingRequestSemaphoresTestHelper(
Review Comment:
When you read this, you can't understand what is tested here. We call a
helper that is supposed to help you with testing `throttlingRequestSemaphore`.
It's preferred to take the 10 lines of code in that helper method, convert
them to 3 helper methods like `setSempahore` and `assertMetricHasValue() ` and
then use those in each test method, so at the end, I will be able to read
through the test scenario.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java:
##########
@@ -44,10 +45,16 @@ public StartableTestPulsarService(SpyConfig spyConfig,
ServiceConfiguration conf
CompactionServiceFactory
compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory
bookKeeperClientFactory,
- Function<BrokerService, BrokerService>
brokerServiceCustomizer) {
+ Function<BrokerService, BrokerService>
brokerServiceCustomizer,
+ PulsarBrokerOpenTelemetry openTelemetry)
{
super(spyConfig, config, localMetadataStore,
configurationMetadataStore, compactionServiceFactory,
brokerInterceptor, bookKeeperClientFactory);
this.brokerServiceCustomizer = brokerServiceCustomizer;
+ if (openTelemetry != null) {
+ // Replace existing OpenTelemetry wrapper class.
+ this.openTelemetry.close();
Review Comment:
This looked different than other variables at PulsarService when I examined
this class. Why ain't it instantiated the way we want it to be for the test?
@lhotari Can you take a closer look here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]