This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8276f218f57 [improve][broker] Reduce number of OpenTelemetry consumer
attributes (#22837)
8276f218f57 is described below
commit 8276f218f576e81c212cedf8b3691f7c1a654e0e
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jun 4 12:57:21 2024 -0700
[improve][broker] Reduce number of OpenTelemetry consumer attributes
(#22837)
---
.../org/apache/pulsar/broker/service/Consumer.java | 36 +++++++++++++++
.../broker/stats/OpenTelemetryConsumerStats.java | 54 ++++++----------------
.../stats/OpenTelemetryConsumerStatsTest.java | 34 +-------------
.../opentelemetry/OpenTelemetryAttributes.java | 5 --
4 files changed, 51 insertions(+), 78 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c9f417c4bc4..19711bfa718 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
+import io.opentelemetry.api.common.Attributes;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
@@ -35,6 +36,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.Getter;
@@ -69,6 +71,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import
org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,6 +161,10 @@ public class Consumer {
@Getter
private final Instant connectedSince = Instant.now();
+ private volatile Attributes openTelemetryAttributes;
+ private static final AtomicReferenceFieldUpdater<Consumer, Attributes>
OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(Consumer.class,
Attributes.class, "openTelemetryAttributes");
+
public Consumer(Subscription subscription, SubType subType, String
topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
@@ -231,6 +238,8 @@ public class Consumer {
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();
this.schemaType = schemaType;
+
+ OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
}
@VisibleForTesting
@@ -263,6 +272,7 @@ public class Consumer {
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
this.schemaType = null;
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
+ OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
}
public SubType subType() {
@@ -1203,4 +1213,30 @@ public class Consumer {
}
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
+
+ public Attributes getOpenTelemetryAttributes() {
+ if (openTelemetryAttributes != null) {
+ return openTelemetryAttributes;
+ }
+ return OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
oldValue -> {
+ if (oldValue != null) {
+ return oldValue;
+ }
+ var topicName = TopicName.get(subscription.getTopic().getName());
+
+ var builder = Attributes.builder()
+ .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME,
consumerName)
+ .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID,
consumerId)
+ .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME,
subscription.getName())
+ .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE,
subType.toString())
+ .put(OpenTelemetryAttributes.PULSAR_DOMAIN,
topicName.getDomain().toString())
+ .put(OpenTelemetryAttributes.PULSAR_TENANT,
topicName.getTenant())
+ .put(OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicName.getNamespace())
+ .put(OpenTelemetryAttributes.PULSAR_TOPIC,
topicName.getPartitionedTopicName());
+ if (topicName.isPartitioned()) {
+ builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX,
topicName.getPartitionIndex());
+ }
+ return builder.build();
+ });
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
index 25af3959db3..09b487a8fa2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStats.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.stats;
-import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.Collection;
@@ -27,8 +26,6 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
public class OpenTelemetryConsumerStats implements AutoCloseable {
@@ -52,6 +49,9 @@ public class OpenTelemetryConsumerStats implements
AutoCloseable {
public static final String MESSAGE_UNACKNOWLEDGED_COUNTER =
"pulsar.broker.consumer.message.unack.count";
private final ObservableLongMeasurement messageUnacknowledgedCounter;
+ public static final String CONSUMER_BLOCKED_COUNTER =
"pulsar.broker.consumer.blocked";
+ private final ObservableLongMeasurement consumerBlockedCounter;
+
// Replaces pulsar_consumer_available_permits
public static final String MESSAGE_PERMITS_COUNTER =
"pulsar.broker.consumer.permit.count";
private final ObservableLongMeasurement messagePermitsCounter;
@@ -91,6 +91,12 @@ public class OpenTelemetryConsumerStats implements
AutoCloseable {
.setDescription("The total number of messages unacknowledged
by this consumer.")
.buildObserver();
+ consumerBlockedCounter = meter
+ .upDownCounterBuilder(CONSUMER_BLOCKED_COUNTER)
+ .setUnit("1")
+ .setDescription("Indicates whether the consumer is currently
blocked due to unacknowledged messages.")
+ .buildObserver();
+
messagePermitsCounter = meter
.upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
.setUnit("{permit}")
@@ -114,6 +120,7 @@ public class OpenTelemetryConsumerStats implements
AutoCloseable {
messageAckCounter,
messageRedeliverCounter,
messageUnacknowledgedCounter,
+ consumerBlockedCounter,
messagePermitsCounter);
}
@@ -123,48 +130,13 @@ public class OpenTelemetryConsumerStats implements
AutoCloseable {
}
private void recordMetricsForConsumer(Consumer consumer) {
- var subscription = consumer.getSubscription();
- var topicName = TopicName.get(subscription.getTopic().getName());
-
- var builder = Attributes.builder()
- .put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME,
consumer.consumerName())
- .put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID,
consumer.consumerId())
- .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
- consumer.getConnectedSince().getEpochSecond())
- .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME,
subscription.getName())
- .put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE,
consumer.subType().toString())
- .put(OpenTelemetryAttributes.PULSAR_DOMAIN,
topicName.getDomain().toString())
- .put(OpenTelemetryAttributes.PULSAR_TENANT,
topicName.getTenant())
- .put(OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicName.getNamespace())
- .put(OpenTelemetryAttributes.PULSAR_TOPIC,
topicName.getPartitionedTopicName());
- if (topicName.isPartitioned()) {
- builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX,
topicName.getPartitionIndex());
- }
- var clientAddress = consumer.getClientAddressAndPort();
- if (clientAddress != null) {
- builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS,
clientAddress);
- }
- var clientVersion = consumer.getClientVersion();
- if (clientVersion != null) {
- builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION,
clientVersion);
- }
- var metadataList = consumer.getMetadata()
- .entrySet()
- .stream()
- .map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
- .toList();
- builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA,
metadataList);
- var attributes = builder.build();
-
+ var attributes = consumer.getOpenTelemetryAttributes();
messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(),
attributes);
- messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
- Attributes.builder()
- .putAll(attributes)
- .put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED,
consumer.isBlocked())
- .build());
+ messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
attributes);
+ consumerBlockedCounter.record(consumer.isBlocked() ? 1 : 0,
attributes);
messagePermitsCounter.record(consumer.getAvailablePermits(),
attributes);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
index 5fcc6754b08..a05d7075cf3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryConsumerStatsTest.java
@@ -20,37 +20,25 @@ package org.apache.pulsar.broker.stats;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.doAnswer;
import io.opentelemetry.api.common.Attributes;
-import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerTestBase;
-import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.awaitility.Awaitility;
-import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class OpenTelemetryConsumerStatsTest extends BrokerTestBase {
- private BrokerInterceptor brokerInterceptor;
-
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
- brokerInterceptor =
- Mockito.mock(BrokerInterceptor.class,
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
super.baseSetup();
}
@@ -64,7 +52,6 @@ public class OpenTelemetryConsumerStatsTest extends
BrokerTestBase {
protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
super.customizeMainPulsarTestContextBuilder(builder);
builder.enableOpenTelemetry(true);
- builder.brokerInterceptor(brokerInterceptor);
}
@Test(timeOut = 30_000)
@@ -78,14 +65,6 @@ public class OpenTelemetryConsumerStatsTest extends
BrokerTestBase {
var subscriptionName = BrokerTestUtil.newUniqueName("test");
var receiverQueueSize = 100;
- // Intercept calls to create consumer, in order to fetch client
information.
- var consumerRef = new AtomicReference<Consumer>();
- doAnswer(invocation -> {
- consumerRef.compareAndSet(null, invocation.getArgument(1));
- return null;
- }).when(brokerInterceptor)
- .consumerCreated(any(), argThat(arg ->
arg.getSubscription().getName().equals(subscriptionName)), any());
-
@Cleanup
var consumer = pulsarClient.newConsumer()
.topic(topicName)
@@ -94,12 +73,8 @@ public class OpenTelemetryConsumerStatsTest extends
BrokerTestBase {
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.receiverQueueSize(receiverQueueSize)
- .property("prop1", "value1")
.subscribe();
- Awaitility.await().until(() -> consumerRef.get() != null);
- var serverConsumer = consumerRef.get();
-
@Cleanup
var producer = pulsarClient.newProducer()
.topic(topicName)
@@ -121,11 +96,6 @@ public class OpenTelemetryConsumerStatsTest extends
BrokerTestBase {
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE,
SubscriptionType.Shared.toString())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME,
consumer.getConsumerName())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, 0)
- .put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
- serverConsumer.getConnectedSince().getEpochSecond())
- .put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS,
serverConsumer.getClientAddressAndPort())
- .put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION,
serverConsumer.getClientVersion())
- .put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA,
List.of("prop1:value1"))
.build();
Awaitility.await().untilAsserted(() -> {
@@ -141,9 +111,9 @@ public class OpenTelemetryConsumerStatsTest extends
BrokerTestBase {
actual ->
assertThat(actual).isGreaterThanOrEqualTo(receiverQueueSize - messageCount -
ackCount));
var unAckCount = messageCount - ackCount;
- assertMetricLongSumValue(metrics,
OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER,
-
attributes.toBuilder().put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED,
false).build(),
+ assertMetricLongSumValue(metrics,
OpenTelemetryConsumerStats.MESSAGE_UNACKNOWLEDGED_COUNTER, attributes,
unAckCount);
+ assertMetricLongSumValue(metrics,
OpenTelemetryConsumerStats.CONSUMER_BLOCKED_COUNTER, attributes, 0);
assertMetricLongSumValue(metrics,
OpenTelemetryConsumerStats.MESSAGE_REDELIVER_COUNTER, attributes,
actual ->
assertThat(actual).isGreaterThanOrEqualTo(unAckCount));
});
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 4f898b382e6..a3e8a0c1e72 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -76,11 +76,6 @@ public interface OpenTelemetryAttributes {
*/
AttributeKey<Long> PULSAR_CONSUMER_ID =
AttributeKey.longKey("pulsar.consumer.id");
- /**
- * Indicates whether the consumer is currently blocked on unacknowledged
messages or not.
- */
- AttributeKey<Boolean> PULSAR_CONSUMER_BLOCKED =
AttributeKey.booleanKey("pulsar.consumer.blocked");
-
/**
* The consumer metadata properties, as a list of "key:value" pairs.
*/