asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1510298288
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName,
Map<String, String> aut
*/
ClientBuilder enableBusyWait(boolean enableBusyWait);
+ ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry
openTelemetry);
+
+ ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality
metricsCardinality);
Review Comment:
As discussed in the community bi-weekly meeting, I prefer to avoid any
"add-ons" / wrappers around OpenTelemetry and use it as plainly as possible to
keep things simple.
For this point specifically, we can record the metrics using *all* the
attributes we want, including the high-cardinality ones such as topic.
OpenTelemetry
[added](https://github.com/open-telemetry/opentelemetry-specification/pull/3546/files)
an experimental metrics advice API to the specifications, allowing an
instrument to specify the default attributes for the default view. This means
we can record using namespace and topic attributes but only default to
namespace.
This was implemented in Java SDK and placed at a different interface and
artifact, which can depend on it.
```
/** Extended {@link DoubleCounterBuilder} with experimental APIs. */
public interface ExtendedDoubleCounterBuilder extends DoubleCounterBuilder {
/**
* Specify the attribute advice, which suggests the recommended set of
attribute keys to be used
* for this counter.
*/
default ExtendedDoubleCounterBuilder
setAttributesAdvice(List<AttributeKey<?>> attributes) {
return this;
}
}
```
The issue that started all of this in December 2022:
https://github.com/open-telemetry/opentelemetry-specification/issues/3061.
Given all of this, I would remove this line from the builder.
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName,
Map<String, String> aut
*/
ClientBuilder enableBusyWait(boolean enableBusyWait);
+ ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry
openTelemetry);
Review Comment:
This supports the use case where the user chose OpenTelemetry to
define/export their metrics, *and* they have done so explicitly since they have
an `OpenTelemetry` instance they can pass to the Pulsar client.
There is also the possibility of a user just installing the OTel Java
user-agent (JVM command line parameters). When that is the case, the Java Agent
is building an `OpenTelemetry` instance and placing it at
`GlobalOpenTelemetry`. Hence in the `ClientBuilder.build()` implementation, we
can default to `GlobalOpenTelemetry.get()` in case the user has not supplied
the `OpenTelemetry` instance, and then the Java agent use case will just work -
metrics will be registered and exported.
If the user agent was not installed AND `openTelemetry()` method wasn't
used, when we default to `GlobalOpenTelemetry.get()`, we'll simply get a NoOp
`OpenTelemetry`.
If the user decided to use `GlobalOpenTelemetry` explicitly, then it's also
ok, as we'll just use it, even if they didn't provide `OpenTelemetry` instance
explicitly.
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName,
Map<String, String> aut
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
/**
- * Set the interval between each stat info <i>(default: 60 seconds)</i>
Stats will be activated with positive
+ * Set the interval between each stat info <i>(default: disabled)</i>
Stats will be activated with positive
Review Comment:
On the server side, we have marked OpenTelemetry as Experimental since we
might get some feedback and decide to name it differently. For example, we
might change something on the server side and then decide to change it on the
client side. If we mark client-side OTel metrics as experimental, we can change
it.
If we do so, maybe it's better to leave the 60 seconds value as is for now.
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java:
##########
@@ -29,9 +30,12 @@
*
* <p>All the stats are relative to the last recording period. The interval of
the stats refreshes is configured with
* {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)}
with a default of 1 minute.
+ *
+ * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to
enable stats
Review Comment:
I think it's very important to have documentation additions that will cover
the following:
1. The new metrics information, including name, unit, attributes, and type
of instrument.
2. How to use. Not many people are familiar with OpenTelemetry:
- Linking to appropriate places for configuration. Maybe a short
description of the possibilities?
- Explanation that there are existing bridges if they are using
Dropwizard or Micrometer.
- Last resort: Define your MetricReader to patch into your own.
All of those, in my opinion, should be in the documentation.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
Review Comment:
1. Why at the client level and not at the topic level? Having that per
(broker, namespace, topic) makes sense, no?
Now I get that there was that line that filled the attribute. Again, I'm all
for having it explicitly to make it easy to capture when you read the code
finding a certain metric.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java:
##########
@@ -121,6 +123,18 @@ public ClientBuilder authentication(Authentication
authentication) {
return this;
}
+ @Override
+ public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) {
+ conf.setOpenTelemetry(openTelemetry);
Review Comment:
Check later if there is a null check inside
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf,
EventLoopGroup eventLoopGroup, in
this.idleState = new ClientCnxIdleState(this);
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" +
conf.getDescription()));
+ this.connectionsOpenedCounter =
instrumentProvider.newCounter("pulsar.client.connections.opened",
Unit.Connections,
+ "Counter of connections opened", Attributes.empty());
Review Comment:
For description, how about "The number of connections opened"?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
Review Comment:
The instrument is the duration of a lookup request? If so it should suffixed
with `.duration` ==> `pulsar.client.lookup.duration`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
+ "Number of messages received", attrs);
+ bytesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Bytes,
Review Comment:
Both bytes and messages have used the same instrument name.
How about?
- `pulsar.client.consumer.message.received.count`
- `pulsar.client.consumer.message.received.size`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
+ "Number of messages received", attrs);
+ bytesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Bytes,
+ "Bytes received", attrs);
+ messagesPrefetchedGauge =
ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
+ "Number of messages currently sitting in the consumer
pre-fetch queue", attrs);
+ bytesPrefetchedGauge =
ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
+ "Total number of bytes currently sitting in the consumer
pre-fetch queue", attrs);
+
+ consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack",
Unit.Messages,
Review Comment:
1. The unit is Messages but the instrument name contains operations.
2. How about:
- `pulsar.client.consumer.message.ack`
- `pulsar.client.consumer.message.nack`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -1228,7 +1228,10 @@ public int getTotalIncomingMessages() {
protected void clearIncomingMessages() {
// release messages if they are pooled messages
- incomingMessages.forEach(Message::release);
+ incomingMessages.forEach(msg -> {
Review Comment:
Why expand?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic,
ProducerConfiguration
metadata = Collections.unmodifiableMap(new
HashMap<>(conf.getProperties()));
}
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ latencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.latency",
+ "Publish latency experienced by the application, includes
client batching time", attrs);
+ rpcLatencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+ "Publish RPC latency experienced internally by the client when
sending data to receiving an ack", attrs);
+ publishedBytesCounter =
ip.newCounter("pulsar.client.producer.published",
+ Unit.Bytes, "Bytes published", attrs);
+ pendingMessagesCounter =
ip.newUpDownCounter("pulsar.client.producer.pending.messages.count",
Unit.Messages,
Review Comment:
I suggest:
- `pulsar.client.producer.message.pending.count`
- `pulsar.client.producer.message.pending.size`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
Review Comment:
Note: Should it be `pulsar.client.operations.lookup` as in: do prefix with
`operations` since we'll have more of those down below?
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java:
##########
@@ -137,7 +139,7 @@ private void doFindBrokerWithListenerName(boolean useHttp)
throws Exception {
conf.setMaxLookupRedirects(10);
@Cleanup
- LookupService lookupService = useHttp ? new HttpLookupService(conf,
eventExecutors) :
+ LookupService lookupService = useHttp ? new HttpLookupService(new
InstrumentProvider(new ClientConfigurationData()), conf, eventExecutors) :
Review Comment:
Check what is InstrumentProvider
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf,
EventLoopGroup eventLoopGroup, in
this.idleState = new ClientCnxIdleState(this);
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" +
conf.getDescription()));
+ this.connectionsOpenedCounter =
instrumentProvider.newCounter("pulsar.client.connections.opened",
Unit.Connections,
+ "Counter of connections opened", Attributes.empty());
+ this.connectionsClosedCounter =
instrumentProvider.newCounter("pulsar.client.connections.closed",
Unit.Connections,
Review Comment:
Shouldn't we include target details? Perhaps Cluster or Broker?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
+ "Number of messages received", attrs);
+ bytesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Bytes,
+ "Bytes received", attrs);
+ messagesPrefetchedGauge =
ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
Review Comment:
"preteched" --> "prefetched"
How about?
- `pulsar.client.consumer.message.prefetched.count`
- `pulsar.client.consumer.message.prefetched.size`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+ "Lookup operations",
+ attrs.toBuilder().put("type", "topic").build());
+ histoGetTopicMetadata =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+ "Lookup operations",
Review Comment:
How about "Duration of lookup operations"?
(I have a comment below asking how can this be, since it seems those are
durations for 4 different type of binary commands).
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+ private final Meter meter;
+ private final MetricsCardinality metricsCardinality;
+
+ public InstrumentProvider(ClientConfigurationData conf) {
+ this.meter =
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+ this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+ }
+
+ public Attributes getAttributes(String topic) {
+ if (metricsCardinality == MetricsCardinality.None) {
+ return Attributes.empty();
+ }
+
+ AttributesBuilder ab = Attributes.builder();
+ TopicName tn = TopicName.get(topic);
+
+ switch (metricsCardinality) {
+ case Partition:
+ if (tn.isPartitioned()) {
+ ab.put("partition", tn.getPartitionIndex());
+ }
+ // fallthrough
+ case Topic:
+ ab.put("topic", tn.getPartitionedTopicName());
+ // fallthrough
+ case Namespace:
+ ab.put("namespace", tn.getNamespace());
+ // fallthrough
+ case Tenant:
+ ab.put("tenant", tn.getTenant());
Review Comment:
`pulsar.tenant`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+ "Lookup operations",
+ attrs.toBuilder().put("type", "topic").build());
Review Comment:
When I look at the code, it doesn't look like all those 4 cases are
launching a `/lookup` request. They map to different binary protocol commands.
If that is the case, why prefix name the instrument `lookup`?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -239,10 +246,15 @@ String getDescription() {
public ClientCnx(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) {
- this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+ this(new InstrumentProvider(conf), conf, eventLoopGroup,
Commands.getCurrentProtocolVersion());
Review Comment:
Continue the comment above about wrappers.
Given we use Meter directly, can we create `PulsarClientOpenTelemetry` once
when the client is built, and in there instantiate `PulsarClientOpenTelemetry`,
which will create in it the Meter once, and then all classes will call
`pulsarClientOpenTelemetry.getMeter()`?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+ "Lookup operations",
+ attrs.toBuilder().put("type", "topic").build());
+ histoGetTopicMetadata =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+ "Lookup operations",
+ attrs.toBuilder().put("type", "metadata").build());
Review Comment:
Attributes can be mixed from different sources: Kubernetes (pod, namespace),
Pulsar Client, etc. I believe pulsar should be prefixed. In this example,
`pulsar.lookup.type` instead of `type`.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+ private final Meter meter;
+ private final MetricsCardinality metricsCardinality;
+
+ public InstrumentProvider(ClientConfigurationData conf) {
+ this.meter =
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+ this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+ }
+
+ public Attributes getAttributes(String topic) {
+ if (metricsCardinality == MetricsCardinality.None) {
+ return Attributes.empty();
+ }
+
+ AttributesBuilder ab = Attributes.builder();
+ TopicName tn = TopicName.get(topic);
+
+ switch (metricsCardinality) {
+ case Partition:
+ if (tn.isPartitioned()) {
+ ab.put("partition", tn.getPartitionIndex());
+ }
+ // fallthrough
+ case Topic:
+ ab.put("topic", tn.getPartitionedTopicName());
+ // fallthrough
+ case Namespace:
+ ab.put("namespace", tn.getNamespace());
Review Comment:
`pulsar.namespace`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf,
EventLoopGroup eventLoopGrou
}
}, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds,
TimeUnit.SECONDS);
}
+
+ connectionsTcpFailureCounter =
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
+ "Counter of failed connections",
Attributes.builder().put("type", "tcp-failed").build());
Review Comment:
I suggest `type` --> `pulsar.failure.type`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
updateServiceUrl(serviceUrl);
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
Review Comment:
Continue my comment above: In my opinion, we should stick to as simple as
possible usage of OTel API, without any wrappers.
They are less readable - constructors are harder to reason about compared
with builders.
They hide implementation details which forces me to read the internal
anyway, so it doesn't help reability. When do I read those metrics? A lot of
time, as a user or developer, I search the code for metric name , and then try
to figure out the attributes. When I split the code composing the attributes
for example, into some here and some hidden in the wrapper implementation, it
makes it hard to compose the list of attributes. You are forced to grab a
notepad and start writing how everything works, between classes and only then
you know the attributes. Same goes for behavior - I don't know what it means
LatencyHistogram - I know Histogram from OTel. It forces me to read it.
In this specific class, having a single histogram instrument, and multiple
`Attributes` and then use them together:
`histogram.record(getSchemaSucessAttributes, value)` is much simpler, where
```
Attributes getSchemaSuccess = Attributes.builder().
.put("type", "schema")
.put("response", "success")
.build();
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -121,12 +152,19 @@ public CompletableFuture<LookupTopicResult>
getBroker(TopicName topicName) {
*
*/
public CompletableFuture<PartitionedTopicMetadata>
getPartitionedTopicMetadata(TopicName topicName) {
+ long startTime = System.nanoTime();
final MutableObject<CompletableFuture> newFutureCreated = new
MutableObject<>();
try {
return partitionedMetadataInProgress.computeIfAbsent(topicName,
tpName -> {
CompletableFuture<PartitionedTopicMetadata> newFuture =
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
newFutureCreated.setValue(newFuture);
+ newFuture.thenRun(() -> {
+ histoGetBroker.recordSuccess(System.nanoTime() -
startTime);
Review Comment:
Why not `histoGetTopicMetadata`?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf,
EventLoopGroup eventLoopGrou
}
}, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds,
TimeUnit.SECONDS);
}
+
+ connectionsTcpFailureCounter =
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
+ "Counter of failed connections",
Attributes.builder().put("type", "tcp-failed").build());
Review Comment:
Suggested description: "The number of failed connection attempts"
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
+ "Number of messages received", attrs);
Review Comment:
"Number of messages received" --> "The number of messages explicitly
received by the consumer - i.e., called receive() or batchReceive()."
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf,
EventLoopGroup eventLoopGrou
}
}, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds,
TimeUnit.SECONDS);
}
+
+ connectionsTcpFailureCounter =
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
Review Comment:
The unit should be `{connection}` according to
https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units
This comment applies to all `pulsar.client.connection.*` counters
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -114,6 +121,12 @@ public UnAckedMessageTracker(PulsarClientImpl client,
ConsumerBase<?> consumerBa
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
+
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(consumerBase.getTopic());
+ consumerAckTimeoutsCounter =
ip.newCounter("pulsar.client.consumer.ack.timeout", Unit.Messages,
Review Comment:
In the description, we count events, but the unit is messages. Are we
counting messages that their acknowledgment request have timed out ? I'm not
sure I understand this metric.
If messages is counted then maybe we use
`pulsar.client.consumer.message.ack.timeout`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
Review Comment:
The variable is about how many consumers were opened, but we count sessions
in the unit and name. What is the definition of a session?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+ private final Meter meter;
+ private final MetricsCardinality metricsCardinality;
+
+ public InstrumentProvider(ClientConfigurationData conf) {
+ this.meter =
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+ this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+ }
+
+ public Attributes getAttributes(String topic) {
+ if (metricsCardinality == MetricsCardinality.None) {
+ return Attributes.empty();
+ }
+
+ AttributesBuilder ab = Attributes.builder();
+ TopicName tn = TopicName.get(topic);
+
+ switch (metricsCardinality) {
+ case Partition:
+ if (tn.isPartitioned()) {
+ ab.put("partition", tn.getPartitionIndex());
+ }
+ // fallthrough
+ case Topic:
+ ab.put("topic", tn.getPartitionedTopicName());
Review Comment:
`pulsar.topic`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic,
ProducerConfiguration
metadata = Collections.unmodifiableMap(new
HashMap<>(conf.getProperties()));
}
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ latencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.latency",
+ "Publish latency experienced by the application, includes
client batching time", attrs);
+ rpcLatencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+ "Publish RPC latency experienced internally by the client when
sending data to receiving an ack", attrs);
+ publishedBytesCounter =
ip.newCounter("pulsar.client.producer.published",
Review Comment:
I suggest `pulsar.client.producer.rpc.send.size`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java:
##########
@@ -395,6 +397,11 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
)
private String description;
+
+ private transient OpenTelemetry openTelemetry;
Review Comment:
1. Transient since we're serializing this class?
2. How can a configuration class which should be a data class, contain an
instance? Why isn't this saved at the builder level?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+ private final Meter meter;
+ private final MetricsCardinality metricsCardinality;
+
+ public InstrumentProvider(ClientConfigurationData conf) {
+ this.meter =
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
Review Comment:
Since Pulsar's client is a library, as opposed to Pulsar Broker, we should
also add the version to the meter. See:
https://opentelemetry.io/docs/concepts/instrumentation/code-based/#configure-the-opentelemetry-api
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf,
EventLoopGroup eventLoopGroup, in
this.idleState = new ClientCnxIdleState(this);
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" +
conf.getDescription()));
+ this.connectionsOpenedCounter =
instrumentProvider.newCounter("pulsar.client.connections.opened",
Unit.Connections,
Review Comment:
I asked in OTel Slack since it contradicts their guidelines, but I don't
have anything better. I'll get back to you on that.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
+ "Number of messages received", attrs);
+ bytesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Bytes,
+ "Bytes received", attrs);
+ messagesPrefetchedGauge =
ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
+ "Number of messages currently sitting in the consumer
pre-fetch queue", attrs);
+ bytesPrefetchedGauge =
ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
+ "Total number of bytes currently sitting in the consumer
pre-fetch queue", attrs);
+
+ consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack",
Unit.Messages,
+ "Number of ack operations", attrs);
Review Comment:
How about "The number of acknowledged messages"?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic,
ProducerConfiguration
metadata = Collections.unmodifiableMap(new
HashMap<>(conf.getProperties()));
}
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ latencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.latency",
+ "Publish latency experienced by the application, includes
client batching time", attrs);
+ rpcLatencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
Review Comment:
I suggest `pulsar.client.producer.send.duration`. Or if we wish to use rpc
then `pulsar.client.publish.rpc.send.duration`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic,
ProducerConfiguration
metadata = Collections.unmodifiableMap(new
HashMap<>(conf.getProperties()));
}
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ latencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.latency",
Review Comment:
1. Latency is not used in places I checked in OTel.
2. The term "publish" does not appear in `Producer.java`; hence, I think
it's better to stick with "send". It also aligns well with Consumer "receive"
I suggest
`pulsar.client.producer.message.send.duration`
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurat
topicNameWithoutPartition = topicName.getPartitionedTopicName();
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
+ "Counter of sessions opened", attrs.toBuilder().put("type",
"consumer").build());
+ consumersClosedCounter = ip.newCounter("pulsar.client.session.closed",
Unit.Sessions,
+ "Counter of sessions closed", attrs.toBuilder().put("type",
"consumer").build());
+ messagesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Messages,
+ "Number of messages received", attrs);
+ bytesReceivedCounter = ip.newCounter("pulsar.client.received",
Unit.Bytes,
+ "Bytes received", attrs);
Review Comment:
Should be like the previous description but about bytes,
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java:
##########
@@ -60,11 +64,31 @@ public class HttpLookupService implements LookupService {
private static final String BasePathV1 = "lookup/v2/destination/";
private static final String BasePathV2 = "lookup/v2/topic/";
- public HttpLookupService(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup)
+ private final LatencyHistogram histoGetBroker;
+ private final LatencyHistogram histoGetTopicMetadata;
+ private final LatencyHistogram histoGetSchema;
+ private final LatencyHistogram histoListTopics;
+
+ public HttpLookupService(InstrumentProvider instrumentProvider,
ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
+
+ Attributes attrs =
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+ histoGetBroker =
instrumentProvider.newLatencyHistogram("pulsar.client.lookup",
Review Comment:
Although OpenTelemetry supports creating the same instrument name, I think a
metric class dedicated to this should be created and used in both binary and
HTTP lookup services. It's less confusing for the reader and especially the
"searcher".
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic,
ProducerConfiguration
metadata = Collections.unmodifiableMap(new
HashMap<>(conf.getProperties()));
}
+ InstrumentProvider ip = client.instrumentProvider();
+ Attributes attrs = ip.getAttributes(topic);
+ latencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.latency",
+ "Publish latency experienced by the application, includes
client batching time", attrs);
+ rpcLatencyHistogram =
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+ "Publish RPC latency experienced internally by the client when
sending data to receiving an ack", attrs);
+ publishedBytesCounter =
ip.newCounter("pulsar.client.producer.published",
+ Unit.Bytes, "Bytes published", attrs);
+ pendingMessagesCounter =
ip.newUpDownCounter("pulsar.client.producer.pending.messages.count",
Unit.Messages,
+ "Pending messages for this producer", attrs);
+ pendingBytesCounter =
ip.newUpDownCounter("pulsar.client.producer.pending.count", Unit.Bytes,
+ "Pending bytes for this producer", attrs);
+ producersOpenedCounter = ip.newCounter("pulsar.client.session.opened",
Unit.Sessions,
Review Comment:
Seeing the same metric defined twice (producer and consumer) is confusing.
Also, the rule of thumb is that aggregating all attributes across a single
instrument has meaning. I'm not sure if it has meaning to sum producer and
consumer sessions. Maybe each should have their own session?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf,
EventLoopGroup eventLoopGroup, in
this.idleState = new ClientCnxIdleState(this);
this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ (conf.getDescription() == null ? "" : ("-" +
conf.getDescription()));
+ this.connectionsOpenedCounter =
instrumentProvider.newCounter("pulsar.client.connections.opened",
Unit.Connections,
+ "Counter of connections opened", Attributes.empty());
Review Comment:
Same can be applied to closed below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]