asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1510298288


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, 
Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);
+
+    ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality 
metricsCardinality);

Review Comment:
   As discussed in the community bi-weekly meeting, I prefer to avoid any 
"add-ons" / wrappers around OpenTelemetry and use it as plainly as possible to 
keep things simple.
   
   For this point specifically, we can record the metrics using *all* the 
attributes we want, including the high-cardinality ones such as topic. 
OpenTelemetry 
[added](https://github.com/open-telemetry/opentelemetry-specification/pull/3546/files)
 an experimental metrics advice API to the specifications, allowing an 
instrument to specify the default attributes for the default view. This means 
we can record using namespace and topic attributes but only default to 
namespace.
   
   This was implemented in Java SDK and placed at a different interface and 
artifact, which can depend on it.
   ```
   /** Extended {@link DoubleCounterBuilder} with experimental APIs. */
   public interface ExtendedDoubleCounterBuilder extends DoubleCounterBuilder {
   
     /**
      * Specify the attribute advice, which suggests the recommended set of 
attribute keys to be used
      * for this counter.
      */
     default ExtendedDoubleCounterBuilder 
setAttributesAdvice(List<AttributeKey<?>> attributes) {
       return this;
     }
   }
   ```
   
   The issue that started all of this in December 2022: 
https://github.com/open-telemetry/opentelemetry-specification/issues/3061.
   
   Given all of this, I would remove this line from the builder.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -554,6 +558,10 @@ ClientBuilder authentication(String authPluginClassName, 
Map<String, String> aut
      */
     ClientBuilder enableBusyWait(boolean enableBusyWait);
 
+    ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);

Review Comment:
   This supports the use case where the user chose OpenTelemetry to 
define/export their metrics, *and* they have done so explicitly since they have 
an `OpenTelemetry` instance they can pass to the Pulsar client.
   There is also the possibility of a user just installing the OTel Java 
user-agent (JVM command line parameters). When that is the case, the Java Agent 
is building an `OpenTelemetry` instance and placing it at 
`GlobalOpenTelemetry`. Hence in the `ClientBuilder.build()` implementation, we 
can default to `GlobalOpenTelemetry.get()` in case the user has not supplied 
the `OpenTelemetry` instance, and then the Java agent use case will just work - 
metrics will be registered and exported.
   
   If the user agent was not installed AND `openTelemetry()` method wasn't 
used, when we default to `GlobalOpenTelemetry.get()`, we'll simply get a NoOp 
`OpenTelemetry`.
   
   If the user decided to use `GlobalOpenTelemetry` explicitly, then it's also 
ok, as we'll just use it, even if they didn't provide `OpenTelemetry` instance 
explicitly. 
   
   



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##########
@@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, 
Map<String, String> aut
     ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
 
     /**
-     * Set the interval between each stat info <i>(default: 60 seconds)</i> 
Stats will be activated with positive
+     * Set the interval between each stat info <i>(default: disabled)</i> 
Stats will be activated with positive

Review Comment:
   On the server side, we have marked OpenTelemetry as Experimental since we 
might get some feedback and decide to name it differently. For example, we 
might change something on the server side and then decide to change it on the 
client side. If we mark client-side OTel metrics as experimental, we can change 
it.
   If we do so, maybe it's better to leave the 60 seconds value as is for now.
   



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java:
##########
@@ -29,9 +30,12 @@
  *
  * <p>All the stats are relative to the last recording period. The interval of 
the stats refreshes is configured with
  * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} 
with a default of 1 minute.
+ * 
+ * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to 
enable stats

Review Comment:
   I think it's very important to have documentation additions that will cover 
the following:
   1. The new metrics information, including name, unit, attributes, and type 
of instrument.
   2. How to use. Not many people are familiar with OpenTelemetry:
      - Linking to appropriate places for configuration. Maybe a short 
description of the possibilities?
      - Explanation that there are existing bridges if they are using 
Dropwizard or Micrometer.
      - Last resort: Define your MetricReader to patch into your own.
   
      All of those, in my opinion, should be in the documentation.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,

Review Comment:
   1. Why at the client level and not at the topic level? Having that per 
(broker, namespace, topic) makes sense, no?
   
   Now I get that there was that line that filled the attribute. Again, I'm all 
for having it explicitly to make it easy to capture when you read the code 
finding a certain metric.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java:
##########
@@ -121,6 +123,18 @@ public ClientBuilder authentication(Authentication 
authentication) {
         return this;
     }
 
+    @Override
+    public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) {
+        conf.setOpenTelemetry(openTelemetry);

Review Comment:
   Check later if there is a null check inside



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, 
EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + 
conf.getDescription()));
+        this.connectionsOpenedCounter = 
instrumentProvider.newCounter("pulsar.client.connections.opened", 
Unit.Connections,
+                "Counter of connections opened", Attributes.empty());

Review Comment:
   For description, how about "The number of connections opened"?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   The instrument is the duration of a lookup request? If so it should suffixed 
with `.duration` ==> `pulsar.client.lookup.duration`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Bytes,

Review Comment:
   Both bytes and messages have used the same instrument name.
   
   How about?
   - `pulsar.client.consumer.message.received.count`
   - `pulsar.client.consumer.message.received.size`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
+                "Number of messages currently sitting in the consumer 
pre-fetch queue", attrs);
+        bytesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
+                "Total number of bytes currently sitting in the consumer 
pre-fetch queue", attrs);
+
+        consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack", 
Unit.Messages,

Review Comment:
   1. The unit is Messages but the instrument name contains operations.
   2. How about:
      - `pulsar.client.consumer.message.ack`
      - `pulsar.client.consumer.message.nack`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -1228,7 +1228,10 @@ public int getTotalIncomingMessages() {
 
     protected void clearIncomingMessages() {
         // release messages if they are pooled messages
-        incomingMessages.forEach(Message::release);
+        incomingMessages.forEach(msg -> {

Review Comment:
   Why expand?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes 
client batching time", attrs);
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack", attrs);
+        publishedBytesCounter = 
ip.newCounter("pulsar.client.producer.published",
+                Unit.Bytes, "Bytes published", attrs);
+        pendingMessagesCounter = 
ip.newUpDownCounter("pulsar.client.producer.pending.messages.count", 
Unit.Messages,

Review Comment:
   I suggest: 
   - `pulsar.client.producer.message.pending.count`
   - `pulsar.client.producer.message.pending.size`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Note: Should it be `pulsar.client.operations.lookup` as in: do prefix with 
`operations` since we'll have more of those down below?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java:
##########
@@ -137,7 +139,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) 
throws Exception {
         conf.setMaxLookupRedirects(10);
 
         @Cleanup
-        LookupService lookupService = useHttp ? new HttpLookupService(conf, 
eventExecutors) :
+        LookupService lookupService = useHttp ? new HttpLookupService(new 
InstrumentProvider(new ClientConfigurationData()), conf, eventExecutors) :

Review Comment:
   Check what is InstrumentProvider



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, 
EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + 
conf.getDescription()));
+        this.connectionsOpenedCounter = 
instrumentProvider.newCounter("pulsar.client.connections.opened", 
Unit.Connections,
+                "Counter of connections opened", Attributes.empty());
+        this.connectionsClosedCounter = 
instrumentProvider.newCounter("pulsar.client.connections.closed", 
Unit.Connections,

Review Comment:
   Shouldn't we include target details? Perhaps Cluster or Broker?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,

Review Comment:
   "preteched" --> "prefetched"
   
   How about?
   - `pulsar.client.consumer.message.prefetched.count`
   - `pulsar.client.consumer.message.prefetched.size`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",

Review Comment:
   How about "Duration of lookup operations"?
   (I have a comment below asking how can this be, since it seems those are 
durations for 4 different type of binary commands).



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = 
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+        this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+    }
+
+    public Attributes getAttributes(String topic) {
+        if (metricsCardinality == MetricsCardinality.None) {
+            return Attributes.empty();
+        }
+
+        AttributesBuilder ab = Attributes.builder();
+        TopicName tn = TopicName.get(topic);
+
+        switch (metricsCardinality) {
+            case Partition:
+                if (tn.isPartitioned()) {
+                    ab.put("partition", tn.getPartitionIndex());
+                }
+                // fallthrough
+            case Topic:
+                ab.put("topic", tn.getPartitionedTopicName());
+                // fallthrough
+            case Namespace:
+                ab.put("namespace", tn.getNamespace());
+                // fallthrough
+            case Tenant:
+                ab.put("tenant", tn.getTenant());

Review Comment:
   `pulsar.tenant`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());

Review Comment:
   When I look at the code, it doesn't look like all those 4 cases are 
launching a `/lookup` request. They map to different binary protocol commands.  
If that is the case, why prefix name the instrument `lookup`? 



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -239,10 +246,15 @@ String getDescription() {
 
 
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) {
-        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+        this(new InstrumentProvider(conf), conf, eventLoopGroup, 
Commands.getCurrentProtocolVersion());

Review Comment:
   Continue the comment above about wrappers.
   Given we use Meter directly, can we create `PulsarClientOpenTelemetry` once 
when the client is built, and in there instantiate `PulsarClientOpenTelemetry`, 
which will create in it the Meter once, and then all classes will call 
`pulsarClientOpenTelemetry.getMeter()`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "topic").build());
+        histoGetTopicMetadata = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",
+                "Lookup operations",
+                attrs.toBuilder().put("type", "metadata").build());

Review Comment:
   Attributes can be mixed from different sources: Kubernetes (pod, namespace), 
Pulsar Client, etc. I believe pulsar should be prefixed. In this example, 
`pulsar.lookup.type` instead of `type`.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = 
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+        this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+    }
+
+    public Attributes getAttributes(String topic) {
+        if (metricsCardinality == MetricsCardinality.None) {
+            return Attributes.empty();
+        }
+
+        AttributesBuilder ab = Attributes.builder();
+        TopicName tn = TopicName.get(topic);
+
+        switch (metricsCardinality) {
+            case Partition:
+                if (tn.isPartitioned()) {
+                    ab.put("partition", tn.getPartitionIndex());
+                }
+                // fallthrough
+            case Topic:
+                ab.put("topic", tn.getPartitionedTopicName());
+                // fallthrough
+            case Namespace:
+                ab.put("namespace", tn.getNamespace());

Review Comment:
   `pulsar.namespace`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf, 
EventLoopGroup eventLoopGrou
                 }
             }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, 
TimeUnit.SECONDS);
         }
+
+        connectionsTcpFailureCounter = 
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
+                "Counter of failed connections", 
Attributes.builder().put("type", "tcp-failed").build());

Review Comment:
   I suggest `type` --> `pulsar.failure.type`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -84,6 +92,21 @@ public BinaryProtoLookupService(PulsarClientImpl client,
         this.serviceNameResolver = new PulsarServiceNameResolver();
         this.listenerName = listenerName;
         updateServiceUrl(serviceUrl);
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Continue my comment above: In my opinion, we should stick to as simple as 
possible usage of OTel API, without any wrappers. 
   They are less readable - constructors are harder to reason about compared 
with builders. 
   They hide implementation details which forces me to read the internal 
anyway, so it doesn't help reability. When do I read those metrics? A lot of 
time, as a user or developer, I search the code for metric name , and then try 
to figure out the attributes. When I split the code composing the attributes 
for example, into some here and some hidden in the wrapper implementation, it 
makes it hard to compose the list of attributes. You are forced to grab a 
notepad and start writing how everything works, between classes and only then 
you know the attributes. Same goes for behavior - I don't know what it means 
LatencyHistogram - I know Histogram from OTel. It forces me to read it.
   
   In this specific class, having a single histogram instrument, and multiple 
`Attributes` and then use them together: 
`histogram.record(getSchemaSucessAttributes, value)` is much simpler, where
   ```
   Attributes getSchemaSuccess = Attributes.builder().
       .put("type", "schema")
       .put("response", "success")
       .build();
   ```
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java:
##########
@@ -121,12 +152,19 @@ public CompletableFuture<LookupTopicResult> 
getBroker(TopicName topicName) {
      *
      */
     public CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(TopicName topicName) {
+        long startTime = System.nanoTime();
         final MutableObject<CompletableFuture> newFutureCreated = new 
MutableObject<>();
         try {
             return partitionedMetadataInProgress.computeIfAbsent(topicName, 
tpName -> {
                 CompletableFuture<PartitionedTopicMetadata> newFuture =
                         
getPartitionedTopicMetadata(serviceNameResolver.resolveHost(), topicName);
                 newFutureCreated.setValue(newFuture);
+                newFuture.thenRun(() -> {
+                    histoGetBroker.recordSuccess(System.nanoTime() - 
startTime);

Review Comment:
   Why not `histoGetTopicMetadata`?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf, 
EventLoopGroup eventLoopGrou
                 }
             }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, 
TimeUnit.SECONDS);
         }
+
+        connectionsTcpFailureCounter = 
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,
+                "Counter of failed connections", 
Attributes.builder().put("type", "tcp-failed").build());

Review Comment:
   Suggested description: "The number of failed connection attempts"



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,
+                "Number of messages received", attrs);

Review Comment:
   "Number of messages received" --> "The number of messages explicitly 
received by the consumer - i.e., called receive() or batchReceive()."



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -146,6 +156,12 @@ public ConnectionPool(ClientConfigurationData conf, 
EventLoopGroup eventLoopGrou
                 }
             }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, 
TimeUnit.SECONDS);
         }
+
+        connectionsTcpFailureCounter = 
instrumentProvider.newCounter("pulsar.client.connections.failed", Unit.None,

Review Comment:
   The unit should be `{connection}` according to 
https://opentelemetry.io/docs/specs/semconv/general/metrics/#instrument-units
   This comment applies to all `pulsar.client.connection.*` counters



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java:
##########
@@ -114,6 +121,12 @@ public UnAckedMessageTracker(PulsarClientImpl client, 
ConsumerBase<?> consumerBa
         ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
         this.readLock = readWriteLock.readLock();
         this.writeLock = readWriteLock.writeLock();
+
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(consumerBase.getTopic());
+        consumerAckTimeoutsCounter = 
ip.newCounter("pulsar.client.consumer.ack.timeout", Unit.Messages,

Review Comment:
   In the description, we count events, but the unit is messages. Are we 
counting messages that their acknowledgment request have timed out ? I'm not 
sure I understand this metric.
   If messages is counted then maybe we use 
`pulsar.client.consumer.message.ack.timeout`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,

Review Comment:
   The variable is about how many consumers were opened, but we count sessions 
in the unit and name. What is the definition of a session?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = 
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");
+        this.metricsCardinality = conf.getOpenTelemetryMetricsCardinality();
+    }
+
+    public Attributes getAttributes(String topic) {
+        if (metricsCardinality == MetricsCardinality.None) {
+            return Attributes.empty();
+        }
+
+        AttributesBuilder ab = Attributes.builder();
+        TopicName tn = TopicName.get(topic);
+
+        switch (metricsCardinality) {
+            case Partition:
+                if (tn.isPartitioned()) {
+                    ab.put("partition", tn.getPartitionIndex());
+                }
+                // fallthrough
+            case Topic:
+                ab.put("topic", tn.getPartitionedTopicName());

Review Comment:
   `pulsar.topic`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes 
client batching time", attrs);
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack", attrs);
+        publishedBytesCounter = 
ip.newCounter("pulsar.client.producer.published",

Review Comment:
   I suggest `pulsar.client.producer.rpc.send.size`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java:
##########
@@ -395,6 +397,11 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     )
     private String description;
 
+
+    private transient OpenTelemetry openTelemetry;

Review Comment:
   1. Transient since we're serializing this class?
   2. How can a configuration class which should be a data class, contain an 
instance? Why isn't this saved at the builder level?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java:
##########
@@ -0,0 +1,59 @@
+package org.apache.pulsar.client.impl.metrics;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import org.apache.pulsar.client.api.MetricsCardinality;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+
+public class InstrumentProvider {
+
+    private final Meter meter;
+    private final MetricsCardinality metricsCardinality;
+
+    public InstrumentProvider(ClientConfigurationData conf) {
+        this.meter = 
conf.getOpenTelemetry().getMeter("org.apache.pulsar.client");

Review Comment:
   Since Pulsar's client is a library, as opposed to Pulsar Broker, we should 
also add the version to the meter. See: 
https://opentelemetry.io/docs/concepts/instrumentation/code-based/#configure-the-opentelemetry-api
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, 
EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + 
conf.getDescription()));
+        this.connectionsOpenedCounter = 
instrumentProvider.newCounter("pulsar.client.connections.opened", 
Unit.Connections,

Review Comment:
   I asked in OTel Slack since it contradicts their guidelines, but I don't 
have anything better. I'll get back to you on that.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Bytes,
+                "Bytes received", attrs);
+        messagesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.preteched.messages", Unit.Messages,
+                "Number of messages currently sitting in the consumer 
pre-fetch queue", attrs);
+        bytesPrefetchedGauge = 
ip.newUpDownCounter("pulsar.client.consumer.preteched", Unit.Bytes,
+                "Total number of bytes currently sitting in the consumer 
pre-fetch queue", attrs);
+
+        consumerAcksCounter = ip.newCounter("pulsar.client.consumer.ack", 
Unit.Messages,
+                "Number of ack operations", attrs);

Review Comment:
   How about "The number of acknowledged messages"?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes 
client batching time", attrs);
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",

Review Comment:
   I suggest `pulsar.client.producer.send.duration`. Or if we wish to use rpc 
then `pulsar.client.publish.rpc.send.duration`



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.latency",

Review Comment:
   1. Latency is not used in places I checked in OTel. 
   2. The term "publish" does not appear in `Producer.java`; hence, I think 
it's better to stick with "send". It also aligns well with Consumer "receive"
   
   I suggest 
   `pulsar.client.producer.message.send.duration`
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -386,7 +402,30 @@ protected ConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurat
 
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        consumersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,
+                "Counter of sessions opened", attrs.toBuilder().put("type", 
"consumer").build());
+        consumersClosedCounter = ip.newCounter("pulsar.client.session.closed", 
Unit.Sessions,
+                "Counter of sessions closed", attrs.toBuilder().put("type", 
"consumer").build());
+        messagesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Messages,
+                "Number of messages received", attrs);
+        bytesReceivedCounter = ip.newCounter("pulsar.client.received", 
Unit.Bytes,
+                "Bytes received", attrs);

Review Comment:
   Should be like the previous description but about bytes,



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java:
##########
@@ -60,11 +64,31 @@ public class HttpLookupService implements LookupService {
     private static final String BasePathV1 = "lookup/v2/destination/";
     private static final String BasePathV2 = "lookup/v2/topic/";
 
-    public HttpLookupService(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup)
+    private final LatencyHistogram histoGetBroker;
+    private final LatencyHistogram histoGetTopicMetadata;
+    private final LatencyHistogram histoGetSchema;
+    private final LatencyHistogram histoListTopics;
+
+    public HttpLookupService(InstrumentProvider instrumentProvider, 
ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
             throws PulsarClientException {
         this.httpClient = new HttpClient(conf, eventLoopGroup);
         this.useTls = conf.isUseTls();
         this.listenerName = conf.getListenerName();
+
+        Attributes attrs = 
Attributes.of(AttributeKey.stringKey("transport-type"), "binary");
+
+        histoGetBroker = 
instrumentProvider.newLatencyHistogram("pulsar.client.lookup",

Review Comment:
   Although OpenTelemetry supports creating the same instrument name, I think a 
metric class dedicated to this should be created and used in both binary and 
HTTP lookup services. It's less confusing for the reader and especially the 
"searcher".
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -268,6 +283,23 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
             metadata = Collections.unmodifiableMap(new 
HashMap<>(conf.getProperties()));
         }
 
+        InstrumentProvider ip = client.instrumentProvider();
+        Attributes attrs = ip.getAttributes(topic);
+        latencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.latency",
+                "Publish latency experienced by the application, includes 
client batching time", attrs);
+        rpcLatencyHistogram = 
ip.newLatencyHistogram("pulsar.client.producer.rpc.latency",
+                "Publish RPC latency experienced internally by the client when 
sending data to receiving an ack", attrs);
+        publishedBytesCounter = 
ip.newCounter("pulsar.client.producer.published",
+                Unit.Bytes, "Bytes published", attrs);
+        pendingMessagesCounter = 
ip.newUpDownCounter("pulsar.client.producer.pending.messages.count", 
Unit.Messages,
+                "Pending messages for this producer", attrs);
+        pendingBytesCounter = 
ip.newUpDownCounter("pulsar.client.producer.pending.count", Unit.Bytes,
+                "Pending bytes for this producer", attrs);
+        producersOpenedCounter = ip.newCounter("pulsar.client.session.opened", 
Unit.Sessions,

Review Comment:
   Seeing the same metric defined twice (producer and consumer) is confusing. 
   Also, the rule of thumb is that aggregating all attributes across a single 
instrument has meaning. I'm not sure if it has meaning to sum producer and 
consumer sessions. Maybe each should have their own session?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -258,11 +270,17 @@ public ClientCnx(ClientConfigurationData conf, 
EventLoopGroup eventLoopGroup, in
         this.idleState = new ClientCnxIdleState(this);
         this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
                 + (conf.getDescription() == null ? "" : ("-" + 
conf.getDescription()));
+        this.connectionsOpenedCounter = 
instrumentProvider.newCounter("pulsar.client.connections.opened", 
Unit.Connections,
+                "Counter of connections opened", Attributes.empty());

Review Comment:
   Same can be applied to closed below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to