This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a373258d043 [feat][client] PIP-446: Support Native OpenTelemetry
Tracing in Pulsar Java Client (#24873)
a373258d043 is described below
commit a373258d0430aa04cc05c4c46b0ac5e0e38153cb
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Jan 14 22:10:41 2026 +0800
[feat][client] PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java
Client (#24873)
Co-authored-by: Penghui Li <[email protected]>
---
.../OpenTelemetryTracingIntegrationTest.java | 826 +++++++++++++++++++++
.../apache/pulsar/client/api/ClientBuilder.java | 38 +
.../apache/pulsar/client/api/TraceableMessage.java | 55 ++
.../pulsar/client/api/TraceableMessageId.java | 56 ++
pulsar-client/TRACING.md | 406 ++++++++++
.../pulsar/client/impl/ClientBuilderImpl.java | 6 +
.../apache/pulsar/client/impl/ConsumerBase.java | 1 +
.../pulsar/client/impl/ConsumerBuilderImpl.java | 16 +-
.../apache/pulsar/client/impl/MessageIdImpl.java | 23 +-
.../org/apache/pulsar/client/impl/MessageImpl.java | 22 +-
.../pulsar/client/impl/NegativeAcksTracker.java | 27 +-
.../pulsar/client/impl/ProducerBuilderImpl.java | 15 +-
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../pulsar/client/impl/TopicMessageIdImpl.java | 21 +-
.../pulsar/client/impl/TopicMessageImpl.java | 23 +-
.../client/impl/conf/ClientConfigurationData.java | 7 +
.../client/impl/metrics/InstrumentProvider.java | 10 +
.../tracing/OpenTelemetryConsumerInterceptor.java | 373 ++++++++++
.../tracing/OpenTelemetryProducerInterceptor.java | 143 ++++
.../pulsar/client/impl/tracing/TracingContext.java | 194 +++++
.../pulsar/client/impl/tracing/package-info.java | 139 ++++
.../client/impl/ConsumerBuilderImplTest.java | 8 +-
.../client/impl/ProducerBuilderImplTest.java | 2 +
.../impl/tracing/OpenTelemetryTracingTest.java | 212 ++++++
.../client/impl/tracing/TracingExampleTest.java | 179 +++++
.../pulsar/functions/instance/ContextImplTest.java | 2 +
26 files changed, 2793 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
new file mode 100644
index 00000000000..fcc0bd1776e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java
@@ -0,0 +1,826 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Integration tests for OpenTelemetry tracing with real broker.
+ * Note: These tests may be timing-dependent and could be flaky in CI
environments.
+ * They verify end-to-end tracing functionality with actual Pulsar broker.
+ */
+@Test(groups = "broker")
+public class OpenTelemetryTracingIntegrationTest extends BrokerTestBase {
+
+ private InMemorySpanExporter spanExporter;
+ private OpenTelemetrySdk openTelemetry;
+ private SdkTracerProvider tracerProvider;
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ // Setup OpenTelemetry SDK with in-memory exporter
+ spanExporter = InMemorySpanExporter.create();
+ tracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
+ .build();
+
+ openTelemetry = OpenTelemetrySdk.builder()
+ .setTracerProvider(tracerProvider)
+ .build();
+
+ baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (openTelemetry != null) {
+ openTelemetry.close();
+ }
+ }
+
+ private void flushSpans() throws Exception {
+ tracerProvider.forceFlush().join(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testBasicProducerConsumerTracing() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-basic-tracing";
+ spanExporter.reset();
+
+ // Create client with tracing enabled
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ // Create producer
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ // Create consumer
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Send and receive message
+ MessageId sentMsgId = producer.send("test-message");
+ assertNotNull(sentMsgId);
+
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ assertEquals(msg.getValue(), "test-message");
+ consumer.acknowledge(msg);
+
+ // Close client to force span flush
+ producer.close();
+ consumer.close();
+ client.close();
+
+ // Force flush tracer provider
+ flushSpans();
+
+ // Verify spans - at least one span should be created
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertTrue(spans.size() > 0, "Expected at least one span, got: " +
spans.size());
+
+ // Verify producer span if present
+ spans.stream()
+ .filter(s -> s.getKind() == SpanKind.PRODUCER)
+ .findFirst()
+ .ifPresent(producerSpan -> {
+ assertEquals(producerSpan.getName(), "send " + topic);
+ assertEquals(producerSpan.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
+ });
+
+ // Verify consumer span if present
+ spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .findFirst()
+ .ifPresent(consumerSpan -> {
+ assertEquals(consumerSpan.getName(), "process " + topic);
+ assertEquals(consumerSpan.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
+ assertEquals(consumerSpan.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type")),
+ "acknowledge");
+ });
+ }
+
+ @Test
+ public void testNegativeAcknowledgment() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-negative-ack";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .negativeAckRedeliveryDelay(0, TimeUnit.SECONDS)
+ .subscribe();
+
+ // Send message
+ producer.send("test-message");
+
+ // Receive and negative acknowledge
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.negativeAcknowledge(msg);
+
+ Thread.sleep(3000);
+
+ // Close to ensure negative ack is processed
+ producer.close();
+ consumer.close();
+ client.close();
+
+ // Wait for spans
+ flushSpans();
+
+ // Find consumer span
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ SpanData consumerSpan = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Consumer span not
found. Total spans: "
+ + spans.size() + ", kinds: " + spans.stream()
+ .map(s ->
s.getKind().toString()).collect(java.util.stream.Collectors.joining(", "))));
+
+ // Verify negative ack attribute
+ assertEquals(consumerSpan.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")),
+ "negative_acknowledge");
+ }
+
+ @Test
+ public void testCumulativeAcknowledgment() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-cumulative-ack";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+
+ // Send multiple messages
+ for (int i = 0; i < 5; i++) {
+ producer.send("message-" + i);
+ }
+
+ // Receive all messages
+ Message<String> lastMsg = null;
+ for (int i = 0; i < 5; i++) {
+ lastMsg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(lastMsg);
+ }
+
+ // Cumulative acknowledge last message
+ consumer.acknowledgeCumulative(lastMsg);
+
+ // Wait for spans
+ flushSpans();
+
+ // Verify all consumer spans have cumulative_acknowledge attribute
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpansWithCumulativeAck = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .filter(s ->
"cumulative_acknowledge".equals(s.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type"))))
+ .count();
+
+ assertEquals(consumerSpansWithCumulativeAck, 5);
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
+ @Test
+ public void testAcknowledgmentTimeout() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-ack-timeout";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .subscribe();
+
+ // Send message
+ producer.send("test-message");
+
+ // Receive but don't acknowledge
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+
+ // Note: Ack timeout behavior varies based on subscription type and
broker implementation
+ // For Shared subscription, ack timeout triggers redelivery but span
may already be ended
+ // This test verifies the basic tracing flow works even with ack
timeout configured
+
+ // Acknowledge to properly end the span
+ consumer.acknowledge(msg);
+
+ // Wait for spans
+ flushSpans();
+
+ // Verify consumer span exists with acknowledge attribute
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ boolean foundConsumerSpan = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .anyMatch(s -> s.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type")) !=
null);
+
+ assertTrue(foundConsumerSpan, "Expected consumer span with
acknowledgment.type attribute");
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
+ @Test
+ public void testMultiTopicConsumerTracing() throws Exception {
+ String topic1 = "persistent://prop/ns-abc/test-multi-topic-1";
+ String topic2 = "persistent://prop/ns-abc/test-multi-topic-2";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer1 = client.newProducer(Schema.STRING)
+ .topic(topic1)
+ .create();
+
+ Producer<String> producer2 = client.newProducer(Schema.STRING)
+ .topic(topic2)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topics(List.of(topic1, topic2))
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Send messages to both topics
+ producer1.send("message-topic1");
+ producer2.send("message-topic2");
+
+ // Receive and acknowledge both messages
+ Set<String> receivedTopics = new java.util.HashSet<>();
+ for (int i = 0; i < 2; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ receivedTopics.add(msg.getTopicName());
+ consumer.acknowledge(msg);
+ }
+
+ assertEquals(receivedTopics.size(), 2);
+
+ // Wait for spans
+ flushSpans();
+
+ // Verify spans for both topics
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpans = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .count();
+
+ assertEquals(consumerSpans, 2);
+
+ producer1.close();
+ producer2.close();
+ consumer.close();
+ client.close();
+ }
+
+ @Test
+ public void testTracingWithoutGlobalEnable() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-no-global-tracing";
+ spanExporter.reset();
+
+ // Create client with OpenTelemetry but tracing NOT enabled
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(false) // Explicitly disabled
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Send and receive message
+ producer.send("test-message");
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+
+ // Wait for potential spans
+ flushSpans();
+
+ // Verify NO spans were created
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertEquals(spans.size(), 0, "Expected no spans when tracing is
disabled");
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
+ @Test
+ public void testSharedSubscriptionTracing() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-shared-subscription";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-shared-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ // Send messages
+ for (int i = 0; i < 3; i++) {
+ producer.send("message-" + i);
+ }
+
+ // Receive and acknowledge individually
+ for (int i = 0; i < 3; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+
+ flushSpans();
+
+ // Verify individual acks for Shared subscription
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpansWithIndividualAck = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .filter(s -> "acknowledge".equals(s.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type"))))
+ .count();
+
+ assertEquals(consumerSpansWithIndividualAck, 3);
+ }
+
+ @Test
+ public void testKeySharedSubscriptionTracing() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-key-shared-subscription";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-key-shared-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ // Send messages with keys
+ for (int i = 0; i < 3; i++) {
+ producer.newMessage()
+ .key("key-" + (i % 2))
+ .value("message-" + i)
+ .send();
+ }
+
+ // Receive and acknowledge
+ for (int i = 0; i < 3; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+
+ flushSpans();
+
+ // Verify spans for Key_Shared subscription
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpans = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .count();
+
+ assertEquals(consumerSpans, 3);
+ }
+
+ @Test
+ public void testExclusiveSubscriptionTracing() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-exclusive-subscription";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-exclusive-sub")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ // Send messages
+ for (int i = 0; i < 3; i++) {
+ producer.send("message-" + i);
+ }
+
+ // Receive all messages
+ Message<String> lastMsg = null;
+ for (int i = 0; i < 3; i++) {
+ lastMsg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(lastMsg);
+ }
+
+ // Cumulative acknowledge last message
+ consumer.acknowledgeCumulative(lastMsg);
+
+ producer.close();
+ consumer.close();
+ client.close();
+
+ flushSpans();
+
+ // Verify cumulative ack for Exclusive subscription
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpansWithCumulativeAck = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .filter(s ->
"cumulative_acknowledge".equals(s.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type"))))
+ .count();
+
+ assertEquals(consumerSpansWithCumulativeAck, 3);
+ }
+
+ @Test
+ public void testFailoverSubscriptionWithCumulativeAck() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-failover-cumulative";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-failover-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+
+ // Send messages
+ for (int i = 0; i < 5; i++) {
+ producer.send("message-" + i);
+ }
+
+ // Receive all messages
+ Message<String> lastMsg = null;
+ for (int i = 0; i < 5; i++) {
+ lastMsg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(lastMsg);
+ }
+
+ // Cumulative acknowledge last message
+ consumer.acknowledgeCumulative(lastMsg);
+
+ producer.close();
+ consumer.close();
+ client.close();
+
+ flushSpans();
+
+ // Verify all spans ended with cumulative ack
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpansWithCumulativeAck = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .filter(s ->
"cumulative_acknowledge".equals(s.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type"))))
+ .count();
+
+ assertEquals(consumerSpansWithCumulativeAck, 5);
+ }
+
+ @Test
+ public void testMultiTopicConsumerWithCumulativeAck() throws Exception {
+ String topic1 = "persistent://prop/ns-abc/test-multi-cumulative-1";
+ String topic2 = "persistent://prop/ns-abc/test-multi-cumulative-2";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer1 = client.newProducer(Schema.STRING)
+ .topic(topic1)
+ .create();
+
+ Producer<String> producer2 = client.newProducer(Schema.STRING)
+ .topic(topic2)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topics(List.of(topic1, topic2))
+ .subscriptionName("test-multi-cumulative-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+
+ // Send messages to both topics
+ producer1.send("topic1-msg1");
+ producer1.send("topic1-msg2");
+ producer2.send("topic2-msg1");
+ producer2.send("topic2-msg2");
+
+ // Receive messages from both topics
+ Message<String> topic1LastMsg = null;
+ Message<String> topic2LastMsg = null;
+ for (int i = 0; i < 4; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ if (msg.getTopicName().contains("multi-cumulative-1")) {
+ topic1LastMsg = msg;
+ } else {
+ topic2LastMsg = msg;
+ }
+ }
+
+ // Cumulative acknowledge for each topic separately
+ if (topic1LastMsg != null) {
+ consumer.acknowledgeCumulative(topic1LastMsg);
+ }
+ if (topic2LastMsg != null) {
+ consumer.acknowledgeCumulative(topic2LastMsg);
+ }
+
+ producer1.close();
+ producer2.close();
+ consumer.close();
+ client.close();
+
+ flushSpans();
+
+ // Verify cumulative ack only affects spans from the same topic
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ long consumerSpansWithCumulativeAck = spans.stream()
+ .filter(s -> s.getKind() == SpanKind.CONSUMER)
+ .filter(s ->
"cumulative_acknowledge".equals(s.getAttributes().get(
+ io.opentelemetry.api.common.AttributeKey.stringKey(
+ "messaging.pulsar.acknowledgment.type"))))
+ .count();
+
+ // Should have cumulative ack for messages from both topics
+ assertEquals(consumerSpansWithCumulativeAck, 4);
+ }
+
+ @Test
+ public void testBatchMessagesTracing() throws Exception {
+ String topic = "persistent://prop/ns-abc/test-batch-tracing";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(5)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Send batch of messages
+ for (int i = 0; i < 5; i++) {
+ producer.sendAsync("message-" + i);
+ }
+ producer.flush();
+
+ // Receive and acknowledge all messages
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ consumer.acknowledge(msg);
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+
+ // Wait for spans
+ flushSpans();
+
+ // Verify spans for batched messages
+ // Note: Tracing behavior may vary for batched messages depending on
when spans are created
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertTrue(spans.size() > 0, "Expected at least some spans for batched
messages");
+
+ // Verify that spans have correct attributes
+ spans.stream()
+ .filter(s -> s.getKind() == SpanKind.PRODUCER || s.getKind()
== SpanKind.CONSUMER)
+ .forEach(span -> {
+ assertNotNull(span.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")));
+ assertEquals(span.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
+ });
+ }
+
+ @Test
+ public void testCustomSpan() throws Exception {
+ String topic = "persistent://prop/ns-abc/testCustomSpan";
+ spanExporter.reset();
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .openTelemetry(openTelemetry)
+ .enableTracing(true)
+ .build();
+
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxMessages(5)
+ .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+ .create();
+
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ // Send batch of messages
+ for (int i = 0; i < 5; i++) {
+ producer.sendAsync("message-" + i);
+ }
+ producer.flush();
+
+ InstrumentProvider instrumentProvider = ((PulsarClientImpl)
client).instrumentProvider();
+ final Tracer tracer = instrumentProvider.getTracer();
+ String customSpanName = "business-logic";
+ // Receive and acknowledge all messages
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ Span processingSpan = tracer.spanBuilder(customSpanName)
+ .setSpanKind(SpanKind.CLIENT).startSpan();
+ try (Scope scope = processingSpan.makeCurrent()) {
+ processingSpan.setStatus(StatusCode.OK);
+ consumer.acknowledge(msg);
+ } catch (Exception e) {
+ processingSpan.recordException(e);
+ processingSpan.setStatus(StatusCode.ERROR);
+ consumer.negativeAcknowledge(msg);
+ throw e;
+ } finally {
+ processingSpan.end();
+ }
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+
+ // Wait for spans
+ flushSpans();
+
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertTrue(spans.size() > 0, "Expected at least some spans for batched
messages");
+
+ spans.stream()
+ .filter(s -> s.getName().equals(customSpanName))
+ .forEach(span -> {
+ assertEquals(span.getKind(), SpanKind.CLIENT);
+ });
+ }
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index d31d42bbe63..7ac063d227b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -615,6 +615,44 @@ public interface ClientBuilder extends Serializable,
Cloneable {
*/
ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry
openTelemetry);
+ /**
+ * Enable OpenTelemetry distributed tracing.
+ *
+ * <p>When enabled, interceptors are automatically added to all producers
and consumers
+ * to create spans for message publishing and consumption, and
automatically propagate trace context
+ * via message properties.
+ *
+ * <p>This method is useful when OpenTelemetry is configured globally
(e.g., via Java Agent or
+ * {@link io.opentelemetry.api.GlobalOpenTelemetry}) and you just want to
enable tracing interceptors
+ * without explicitly setting an OpenTelemetry instance.
+ *
+ * <p>Example with Java Agent:
+ * <pre>{@code
+ * // When using -javaagent:opentelemetry-javaagent.jar
+ * PulsarClient client = PulsarClient.builder()
+ * .serviceUrl("pulsar://localhost:6650")
+ * .enableTracing(true) // Use GlobalOpenTelemetry
+ * .build();
+ * }</pre>
+ *
+ * <p>Example with GlobalOpenTelemetry:
+ * <pre>{@code
+ * // Configure GlobalOpenTelemetry elsewhere in your application
+ * GlobalOpenTelemetry.set(myOpenTelemetry);
+ *
+ * // Just enable tracing in the client
+ * PulsarClient client = PulsarClient.builder()
+ * .serviceUrl("pulsar://localhost:6650")
+ * .enableTracing(true)
+ * .build();
+ * }</pre>
+ *
+ * @param tracingEnabled whether to enable tracing (default: false)
+ * @return the client builder instance
+ * @since 4.2.0
+ */
+ ClientBuilder enableTracing(boolean tracingEnabled);
+
/**
* The clock used by the pulsar client.
*
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java
new file mode 100644
index 00000000000..5e4c61778ab
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.opentelemetry.api.trace.Span;
+
+/**
+ * Extension of {@link Message} interface that supports OpenTelemetry tracing.
+ * <p>
+ * This interface allows attaching OpenTelemetry spans directly to messages,
+ * eliminating the need for external tracking via maps.
+ * <p>
+ * The span lifecycle:
+ * <ul>
+ * <li>Producer: Span is created before send and attached to the message.
+ * When the send is acknowledged, the span is retrieved and
completed.</li>
+ * <li>Consumer: Span is created when message is received and attached to
the message.
+ * When the message is acknowledged, the span is retrieved and
completed.</li>
+ * </ul>
+ */
+public interface TraceableMessage {
+
+ /**
+ * Set the OpenTelemetry span associated with this message.
+ * <p>
+ * This method is called by tracing interceptors to attach a span to the
message
+ * for later retrieval when completing the span.
+ *
+ * @param span the span to associate with this message, or null to clear
+ */
+ void setTracingSpan(Span span);
+
+ /**
+ * Get the OpenTelemetry span associated with this message.
+ *
+ * @return the span associated with this message, or null if no span is set
+ */
+ Span getTracingSpan();
+}
\ No newline at end of file
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java
new file mode 100644
index 00000000000..d8470184ccc
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TraceableMessageId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import io.opentelemetry.api.trace.Span;
+
+/**
+ * Extension interface that allows {@link MessageId} implementations to
support OpenTelemetry tracing.
+ * <p>
+ * This interface enables attaching OpenTelemetry spans directly to message
IDs,
+ * allowing span retrieval in acknowledge callbacks which only receive
MessageId,
+ * not the full Message object.
+ * <p>
+ * This is particularly useful for consumer-side tracing where:
+ * <ul>
+ * <li>A span is created when a message is received (in beforeConsume)</li>
+ * <li>The span is attached to the message's MessageId</li>
+ * <li>When the message is acknowledged, the span can be retrieved from the
MessageId
+ * and completed, even though the acknowledge callback only provides
MessageId</li>
+ * </ul>
+ */
+public interface TraceableMessageId {
+
+ /**
+ * Set the OpenTelemetry span associated with this message ID.
+ * <p>
+ * This method is called by tracing interceptors to attach a span to the
message ID
+ * for later retrieval in acknowledge callbacks.
+ *
+ * @param span the span to associate with this message ID, or null to clear
+ */
+ void setTracingSpan(Span span);
+
+ /**
+ * Get the OpenTelemetry span associated with this message ID.
+ *
+ * @return the span associated with this message ID, or null if no span is
set
+ */
+ Span getTracingSpan();
+}
diff --git a/pulsar-client/TRACING.md b/pulsar-client/TRACING.md
new file mode 100644
index 00000000000..4dc8d26809b
--- /dev/null
+++ b/pulsar-client/TRACING.md
@@ -0,0 +1,406 @@
+# OpenTelemetry Tracing for Pulsar Java Client
+
+This document describes how to use OpenTelemetry distributed tracing with the
Pulsar Java client.
+
+## Overview
+
+The Pulsar Java client provides built-in support for OpenTelemetry distributed
tracing. This allows you to:
+
+- Trace message publishing from producer to broker
+- Trace message consumption from broker to consumer
+- Propagate trace context across services via message properties
+- Extract trace context from external sources (e.g., HTTP requests)
+- Create end-to-end traces across your distributed system
+
+## Features
+
+### Producer Tracing
+
+Producer tracing creates spans for:
+- **send** - Span starts when `send()` or `sendAsync()` is called and
completes when broker acknowledges receipt
+
+### Consumer Tracing
+
+Consumer tracing creates spans for:
+- **process** - Span starts when message is received and completes when
message is acknowledged, negatively acknowledged, or ack timeout occurs
+
+### Trace Context Propagation
+
+Trace context is automatically propagated using W3C TraceContext format:
+- `traceparent` - Contains trace ID, span ID, and trace flags
+- `tracestate` - Contains vendor-specific trace information
+
+Context is injected into and extracted from message properties, enabling
seamless trace propagation across services.
+
+## Quick Start
+
+### 1. Add Dependencies
+
+The Pulsar client already includes OpenTelemetry API dependencies. You'll need
to add the SDK and exporters:
+
+```xml
+<dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
+ <version>${opentelemetry.version}</version>
+</dependency>
+<dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-exporter-otlp</artifactId>
+ <version>${opentelemetry.version}</version>
+</dependency>
+```
+
+### 2. Enable Tracing
+
+There are three ways to enable tracing:
+
+#### Option 1: Using OpenTelemetry Java Agent (Easiest)
+
+```bash
+# Start your application with the Java Agent
+java -javaagent:opentelemetry-javaagent.jar \
+ -Dotel.service.name=my-service \
+ -Dotel.exporter.otlp.endpoint=http://localhost:4317 \
+ -jar your-application.jar
+```
+
+```java
+// Just enable tracing - uses GlobalOpenTelemetry from the agent
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .enableTracing(true) // That's it!
+ .build();
+```
+
+#### Option 2: With Explicit OpenTelemetry Instance
+
+```java
+OpenTelemetry openTelemetry = // configure your OpenTelemetry instance
+
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .openTelemetry(openTelemetry, true) // Set OpenTelemetry AND enable
tracing
+ .build();
+```
+
+#### Option 3: Using GlobalOpenTelemetry
+
+```java
+// Configure GlobalOpenTelemetry once in your application
+GlobalOpenTelemetry.set(myOpenTelemetry);
+
+// Enable tracing in the client - uses GlobalOpenTelemetry
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .enableTracing(true)
+ .build();
+```
+
+**What happens when tracing is enabled:**
+- **Create spans** for producer send operations
+- **Inject trace context** into message properties automatically
+- **Create spans** for consumer receive/ack operations
+- **Extract trace context** from message properties automatically
+- Link all spans to create end-to-end distributed traces
+
+### 3. Manual Interceptor Configuration (Advanced)
+
+If you prefer manual control, you can add interceptors explicitly:
+
+```java
+import org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor;
+import org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor;
+
+// Create client (tracing not enabled globally)
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .openTelemetry(openTelemetry)
+ .build();
+
+// Add interceptor manually to specific producer
+Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic("my-topic")
+ .intercept(new OpenTelemetryProducerInterceptor())
+ .create();
+
+// Add interceptor manually to specific consumer
+Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new OpenTelemetryConsumerInterceptor<>())
+ .subscribe();
+```
+
+## Advanced Usage
+
+### End-to-End Tracing Example
+
+This example shows how to create a complete trace from an HTTP request through
Pulsar to a consumer:
+
+```java
+// Service 1: HTTP API that publishes to Pulsar
+@POST
+@Path("/order")
+public Response createOrder(@Context HttpHeaders headers, Order order) {
+ // Extract trace context from incoming HTTP request
+ Context context = TracingProducerBuilder.extractFromHeaders(
+ convertHeaders(headers));
+
+ // Publish to Pulsar with trace context
+ TracingProducerBuilder tracingBuilder = new TracingProducerBuilder();
+ producer.newMessage()
+ .value(order)
+ .let(builder -> tracingBuilder.injectContext(builder, context))
+ .send();
+
+ return Response.accepted().build();
+}
+
+// Service 2: Pulsar consumer that processes orders
+Consumer<Order> consumer = client.newConsumer(Schema.JSON(Order.class))
+ .topic("orders")
+ .subscriptionName("order-processor")
+ .intercept(new OpenTelemetryConsumerInterceptor<>())
+ .subscribe();
+
+while (true) {
+ Message<Order> msg = consumer.receive();
+ // Trace context is automatically extracted
+ // Any spans created here will be part of the same trace
+ processOrder(msg.getValue());
+ consumer.acknowledge(msg);
+}
+```
+
+### Custom Span Creation
+
+You can create custom spans during message processing:
+
+```java
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Scope;
+
+Tracer tracer = GlobalOpenTelemetry.get().getTracer("my-app");
+
+Message<String> msg = consumer.receive();
+
+// Create a custom span for processing
+Span span = tracer.spanBuilder("process-message")
+ .setSpanKind(SpanKind.INTERNAL)
+ .startSpan();
+
+try (Scope scope = span.makeCurrent()) {
+ // Your processing logic
+ processMessage(msg.getValue());
+ span.setStatus(StatusCode.OK);
+} catch (Exception e) {
+ span.recordException(e);
+ span.setStatus(StatusCode.ERROR);
+ throw e;
+} finally {
+ span.end();
+ consumer.acknowledge(msg);
+}
+```
+
+## Configuration
+
+### Compatibility with OpenTelemetry Java Agent
+
+This implementation is **fully compatible** with the [OpenTelemetry Java
Instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/pulsar)
for Pulsar:
+
+- Both use **W3C TraceContext** format (traceparent, tracestate headers)
+- Both propagate context via **message properties**
+- **No conflicts**: Our implementation checks if trace context is already
present (from Java Agent) and avoids duplicate injection
+- You can use either approach or both together
+
+### Using OpenTelemetry Java Agent
+
+The easiest way to enable tracing is using the OpenTelemetry Java Agent
(automatic instrumentation):
+
+```bash
+java -javaagent:path/to/opentelemetry-javaagent.jar \
+ -Dotel.service.name=my-service \
+ -Dotel.exporter.otlp.endpoint=http://localhost:4317 \
+ -jar your-application.jar
+```
+
+**Note**: When using the Java Agent, you don't need to call
`.openTelemetry(otel, true)` as the agent automatically instruments Pulsar.
However, calling it won't cause conflicts.
+
+### Programmatic Configuration
+
+You can also configure OpenTelemetry programmatically:
+
+```java
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
+
+OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
+ .setEndpoint("http://localhost:4317")
+ .build();
+
+SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
+ .build();
+
+OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder()
+ .setTracerProvider(tracerProvider)
+ .buildAndRegisterGlobal();
+```
+
+### Environment Variables
+
+Configure via environment variables:
+
+```bash
+export OTEL_SERVICE_NAME=my-service
+export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
+export OTEL_TRACES_EXPORTER=otlp
+export OTEL_METRICS_EXPORTER=otlp
+```
+
+## Span Attributes
+
+The tracing implementation adds the following attributes to spans following
the [OpenTelemetry messaging semantic
conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/):
+
+### Producer Spans
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.operation.name`: "send"
+- `messaging.message.id`: Message ID (added when broker confirms)
+
+**Span naming**: `send {topic}` (e.g., "send my-topic")
+
+### Consumer Spans
+- `messaging.system`: "pulsar"
+- `messaging.destination.name`: Topic name
+- `messaging.destination.subscription.name`: Subscription name
+- `messaging.operation.name`: "process"
+- `messaging.message.id`: Message ID
+- `messaging.pulsar.acknowledgment.type`: How the message was acknowledged
+ - `"acknowledge"`: Normal individual acknowledgment
+ - `"cumulative_acknowledge"`: Cumulative acknowledgment
+ - `"negative_acknowledge"`: Message negatively acknowledged (will retry)
+ - `"ack_timeout"`: Acknowledgment timeout occurred (will retry)
+
+**Span naming**: `process {topic}` (e.g., "process my-topic")
+
+## Span Lifecycle and Acknowledgment Behavior
+
+Understanding how spans are handled for different acknowledgment scenarios.
Every consumer span includes a `messaging.pulsar.acknowledgment.type` attribute
indicating how it was completed:
+
+### Successful Acknowledgment
+- Span ends with **OK** status
+- Attribute: `messaging.pulsar.acknowledgment.type = "acknowledge"`
+
+### Cumulative Acknowledgment
+- Span ends with **OK** status
+- Attribute: `messaging.pulsar.acknowledgment.type = "cumulative_acknowledge"`
+- All spans up to the acknowledged position are ended with this attribute
+
+### Negative Acknowledgment
+- Span ends with **OK** status (not an error)
+- Attribute: `messaging.pulsar.acknowledgment.type = "negative_acknowledge"`
+- This is normal flow, not a failure - the message will be redelivered and a
new span will be created
+
+### Acknowledgment Timeout
+- Span ends with **OK** status (not an error)
+- Attribute: `messaging.pulsar.acknowledgment.type = "ack_timeout"`
+- This is expected behavior when `ackTimeout` is configured - the message will
be redelivered and a new span will be created
+
+### Application Exception During Processing
+- If your application code throws an exception, create a child span and mark
it with ERROR status
+- The consumer span itself will end normally when you call
`negativeAcknowledge()`
+- This provides clear separation between messaging operations (OK) and
application logic (ERROR)
+
+**Example - Separating messaging and application errors**:
+```java
+Message<String> msg = consumer.receive();
+Span processingSpan = tracer.spanBuilder("business-logic").startSpan();
+try (Scope scope = processingSpan.makeCurrent()) {
+ processMessage(msg.getValue());
+ processingSpan.setStatus(StatusCode.OK);
+ consumer.acknowledge(msg); // Consumer span ends with
acknowledgment.type="acknowledge"
+} catch (Exception e) {
+ processingSpan.recordException(e);
+ processingSpan.setStatus(StatusCode.ERROR); // Business logic failed
+ consumer.negativeAcknowledge(msg); // Consumer span ends with
acknowledgment.type="negative_acknowledge"
+ throw e;
+} finally {
+ processingSpan.end();
+}
+```
+
+### Querying by Acknowledgment Type
+
+The `messaging.pulsar.acknowledgment.type` attribute allows you to filter and
analyze spans:
+
+**Example queries in your tracing backend**:
+- Find all retried messages: `messaging.pulsar.acknowledgment.type =
"negative_acknowledge" OR "ack_timeout"`
+- Calculate retry rate: `count(negative_acknowledge) / count(acknowledge)`
+- Identify timeout issues: `messaging.pulsar.acknowledgment.type =
"ack_timeout"`
+- Analyze cumulative vs individual acks: Group by
`messaging.pulsar.acknowledgment.type`
+
+## Best Practices
+
+1. **Always use interceptors**: Add tracing interceptors to both producers and
consumers for complete visibility.
+
+2. **Propagate context from HTTP**: When publishing from HTTP endpoints,
always extract and propagate the trace context.
+
+3. **Handle errors properly**: Ensure spans are ended even when exceptions
occur.
+
+4. **Distinguish messaging vs. application errors**:
+ - Messaging operations (nack, timeout) end with OK status + events
+ - Application failures should be tracked in separate child spans with ERROR
status
+
+5. **Use meaningful span names**: The default span names include the topic
name for easy identification.
+
+6. **Consider performance**: Tracing adds minimal overhead, but in
high-throughput scenarios, consider sampling.
+
+7. **Clean up resources**: Ensure interceptors and OpenTelemetry SDK are
properly closed when shutting down.
+
+## Troubleshooting
+
+### Traces not appearing
+
+1. Verify OpenTelemetry SDK is configured and exporters are set up
+2. Check that interceptors are added to producers/consumers
+3. Verify trace exporter endpoint is reachable
+4. Enable debug logging: `-Dio.opentelemetry.javaagent.debug=true`
+
+### Missing parent-child relationships
+
+1. Ensure trace context is being injected via
`TracingProducerBuilder.injectContext()`
+2. Verify message properties contain `traceparent` header
+3. Check that both producer and consumer have tracing interceptors
+
+### High overhead
+
+1. Consider using sampling: `-Dotel.traces.sampler=parentbased_traceidratio
-Dotel.traces.sampler.arg=0.1`
+2. Use batch span processor (default)
+3. Adjust batch processor settings if needed
+
+## Examples
+
+See the following files for complete examples:
+- `TracingExampleTest.java` - Comprehensive usage examples
+- `OpenTelemetryTracingTest.java` - Unit tests demonstrating API usage
+
+## API Reference
+
+### Main Classes
+
+- `OpenTelemetryProducerInterceptor` - Producer interceptor for tracing
+- `OpenTelemetryConsumerInterceptor` - Consumer interceptor for tracing
+- `TracingContext` - Utility methods for span creation and context propagation
+- `TracingProducerBuilder` - Helper for injecting trace context into messages
+
+## Additional Resources
+
+- [OpenTelemetry Java
Documentation](https://opentelemetry.io/docs/instrumentation/java/)
+- [W3C Trace Context Specification](https://www.w3.org/TR/trace-context/)
+- [Pulsar Documentation](https://pulsar.apache.org/docs/)
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 9bbd9cebca8..7c8529ceeb7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -157,6 +157,12 @@ public class ClientBuilderImpl implements ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder enableTracing(boolean tracingEnabled) {
+ conf.setTracingEnabled(tracingEnabled);
+ return this;
+ }
+
@Override
public ClientBuilder authentication(String authPluginClassName, String
authParamsString)
throws UnsupportedAuthenticationException {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 749d23651aa..1755c94b0de 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -80,6 +80,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected static final double
MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;
protected final String subscription;
+ @Getter
protected final ConsumerConfigurationData<T> conf;
protected final String consumerName;
protected final CompletableFuture<Consumer<T>> subscribeFuture;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dc2363c279f..11bf617e2fd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -213,10 +213,22 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
applyDLQConfig = CompletableFuture.completedFuture(null);
}
return applyDLQConfig.thenCompose(__ -> {
- if (interceptorList == null || interceptorList.size() == 0) {
+ // Automatically add tracing interceptor if tracing is enabled
+ List<ConsumerInterceptor<T>> effectiveInterceptors =
interceptorList;
+ if (client.getConfiguration().isTracingEnabled()) {
+ if (effectiveInterceptors == null) {
+ effectiveInterceptors = new java.util.ArrayList<>();
+ } else {
+ effectiveInterceptors = new
java.util.ArrayList<>(effectiveInterceptors);
+ }
+ effectiveInterceptors.add(
+ new
org.apache.pulsar.client.impl.tracing.OpenTelemetryConsumerInterceptor<>());
+ }
+
+ if (effectiveInterceptors == null || effectiveInterceptors.size()
== 0) {
return client.subscribeAsync(conf, schema, null);
} else {
- return client.subscribeAsync(conf, schema, new
ConsumerInterceptors<>(interceptorList));
+ return client.subscribeAsync(conf, schema, new
ConsumerInterceptors<>(effectiveInterceptors));
}
});
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 8cffba44dc5..6446a9b60da 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -25,14 +25,23 @@ import java.io.IOException;
import java.util.Objects;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.TraceableMessageId;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
-public class MessageIdImpl implements MessageIdAdv {
+public class MessageIdImpl implements MessageIdAdv, TraceableMessageId {
+ private static final long serialVersionUID = 1L;
+
protected final long ledgerId;
protected final long entryId;
protected final int partitionIndex;
+ /**
+ * OpenTelemetry tracing span associated with this message ID.
+ * Used for distributed tracing support via the TraceableMessageId
interface.
+ */
+ private transient io.opentelemetry.api.trace.Span tracingSpan;
+
// Private constructor used only for json deserialization
@SuppressWarnings("unused")
private MessageIdImpl() {
@@ -188,4 +197,16 @@ public class MessageIdImpl implements MessageIdAdv {
// there is no message batch so we pass -1
return toByteArray(-1, 0);
}
+
+ // TraceableMessageId implementation for OpenTelemetry support
+
+ @Override
+ public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+ this.tracingSpan = span;
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.Span getTracingSpan() {
+ return this.tracingSpan;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 4d7b6cc4734..c964db57505 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.TraceableMessage;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
@@ -60,7 +61,7 @@ import org.apache.pulsar.common.schema.SchemaIdUtil;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-public class MessageImpl<T> implements Message<T> {
+public class MessageImpl<T> implements TraceableMessage, Message<T> {
protected MessageId messageId;
private final MessageMetadata msgMetadata;
@@ -84,6 +85,13 @@ public class MessageImpl<T> implements Message<T> {
private boolean poolMessage;
@Getter
private long consumerEpoch;
+
+ /**
+ * OpenTelemetry tracing span associated with this message.
+ * Used for distributed tracing support via the TraceableMessage interface.
+ */
+ private transient io.opentelemetry.api.trace.Span tracingSpan;
+
// Constructor for out-going message
public static <T> MessageImpl<T> create(MessageMetadata msgMetadata,
ByteBuffer payload, Schema<T> schema,
String topic) {
@@ -844,6 +852,18 @@ public class MessageImpl<T> implements Message<T> {
return payload;
}
+ // TraceableMessage implementation for OpenTelemetry support
+
+ @Override
+ public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+ this.tracingSpan = span;
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.Span getTracingSpan() {
+ return this.tracingSpan;
+ }
+
enum SchemaState {
None, Ready, Broken
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 273880569c3..e0ec16f507e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMess
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timeout;
import io.netty.util.Timer;
+import io.opentelemetry.api.trace.Span;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
@@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.RedeliveryBackoff;
+import org.apache.pulsar.client.api.TraceableMessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
@@ -47,6 +49,7 @@ class NegativeAcksTracker implements Closeable {
// different timestamp, there will be multiple entries in the map
// RB Tree -> LongOpenHashMap -> Roaring64Bitmap
private Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
nackedMessages = null;
+ private final Long2ObjectMap<Long2ObjectMap<MessageId>> nackedMessageIds =
new Long2ObjectOpenHashMap<>();
private final ConsumerBase<?> consumer;
private final Timer timer;
@@ -89,7 +92,17 @@ class NegativeAcksTracker implements Closeable {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entrySet = ledgerEntry.getValue();
entrySet.forEach(entryId -> {
- MessageId msgId = new MessageIdImpl(ledgerId, entryId,
DUMMY_PARTITION_INDEX);
+ MessageId msgId = null;
+ Long2ObjectMap<MessageId> entryMap =
nackedMessageIds.get(ledgerId);
+ if (entryMap != null) {
+ msgId = entryMap.remove(entryId);
+ if (entryMap.isEmpty()) {
+ nackedMessageIds.remove(ledgerId);
+ }
+ }
+ if (msgId == null) {
+ msgId = new MessageIdImpl(ledgerId, entryId,
DUMMY_PARTITION_INDEX);
+ }
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId,
messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
});
@@ -143,6 +156,15 @@ class NegativeAcksTracker implements Closeable {
}
private synchronized void add(MessageId messageId, int redeliveryCount) {
+ if (messageId instanceof TraceableMessageId) {
+ Span span = ((TraceableMessageId) messageId).getTracingSpan();
+ if (span != null) {
+ MessageIdAdv msgId = (MessageIdAdv) messageId;
+ nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new
Long2ObjectOpenHashMap<>())
+ .put(msgId.getEntryId(), messageId);
+ }
+ }
+
if (nackedMessages == null) {
nackedMessages = new Long2ObjectAVLTreeMap<>();
}
@@ -201,5 +223,8 @@ class NegativeAcksTracker implements Closeable {
nackedMessages.clear();
nackedMessages = null;
}
+ if (nackedMessageIds != null) {
+ nackedMessageIds.clear();
+ }
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 7c33cba9645..e176cc41bc6 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -106,9 +106,20 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
return FutureUtil.failedFuture(pce);
}
- return interceptorList == null || interceptorList.size() == 0
+ // Automatically add tracing interceptor if tracing is enabled
+ List<ProducerInterceptor> effectiveInterceptors = interceptorList;
+ if (client.getConfiguration().isTracingEnabled()) {
+ if (effectiveInterceptors == null) {
+ effectiveInterceptors = new ArrayList<>();
+ } else {
+ effectiveInterceptors = new ArrayList<>(effectiveInterceptors);
+ }
+ effectiveInterceptors.add(new
org.apache.pulsar.client.impl.tracing.OpenTelemetryProducerInterceptor());
+ }
+
+ return effectiveInterceptors == null || effectiveInterceptors.size()
== 0
? client.createProducerAsync(conf, schema, null)
- : client.createProducerAsync(conf, schema, new
ProducerInterceptors(interceptorList));
+ : client.createProducerAsync(conf, schema, new
ProducerInterceptors(effectiveInterceptors));
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 3a2ff97f51e..605757c230c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1403,7 +1403,7 @@ public class PulsarClientImpl implements PulsarClient {
return scheduledExecutorProvider;
}
- InstrumentProvider instrumentProvider() {
+ public InstrumentProvider instrumentProvider() {
return instrumentProvider;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
index 3dc9b23e93e..872fe283fb9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -22,8 +22,9 @@ import java.util.BitSet;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.api.TraceableMessageId;
-public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId {
+public class TopicMessageIdImpl implements MessageIdAdv, TopicMessageId,
TraceableMessageId {
private final String ownerTopic;
private final MessageIdAdv msgId;
@@ -129,4 +130,22 @@ public class TopicMessageIdImpl implements MessageIdAdv,
TopicMessageId {
public String toString() {
return msgId.toString();
}
+
+ // TraceableMessageId implementation for OpenTelemetry support
+ // Delegates to the wrapped MessageIdAdv if it implements
TraceableMessageId
+
+ @Override
+ public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+ if (msgId instanceof TraceableMessageId) {
+ ((TraceableMessageId) msgId).setTracingSpan(span);
+ }
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.Span getTracingSpan() {
+ if (msgId instanceof TraceableMessageId) {
+ return ((TraceableMessageId) msgId).getTracingSpan();
+ }
+ return null;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 7b9916b58fc..19533b21601 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -18,15 +18,17 @@
*/
package org.apache.pulsar.client.impl;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Map;
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TraceableMessage;
import org.apache.pulsar.common.api.EncryptionContext;
-public class TopicMessageImpl<T> implements Message<T> {
+public class TopicMessageImpl<T> implements TraceableMessage, Message<T> {
/** This topicPartitionName is get from ConsumerImpl, it contains
partition part. */
private final String topicPartitionName;
@@ -65,6 +67,7 @@ public class TopicMessageImpl<T> implements Message<T> {
}
@Override
+ @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "messageId is
immutable")
public MessageId getMessageId() {
return messageId;
}
@@ -226,4 +229,22 @@ public class TopicMessageImpl<T> implements Message<T> {
return msg.getIndex();
}
+ // TraceableMessage implementation for OpenTelemetry support
+ // Delegates to the wrapped message if it implements TraceableMessage
+
+ @Override
+ public void setTracingSpan(io.opentelemetry.api.trace.Span span) {
+ if (msg instanceof TraceableMessage) {
+ ((TraceableMessage) msg).setTracingSpan(span);
+ }
+ }
+
+ @Override
+ public io.opentelemetry.api.trace.Span getTracingSpan() {
+ if (msg instanceof TraceableMessage) {
+ return ((TraceableMessage) msg).getTracingSpan();
+ }
+ return null;
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index e406581e707..df6e01a73f5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -437,6 +437,13 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
private transient OpenTelemetry openTelemetry;
+ @ApiModelProperty(
+ name = "tracingEnabled",
+ value = "Whether to enable OpenTelemetry distributed tracing. When
enabled, "
+ + "tracing interceptors are automatically added to
producers and consumers."
+ )
+ private boolean tracingEnabled = false;
+
/**
* Gets the authentication settings for the client.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
index a0bdd8b6fb6..d73baef3a0c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
@@ -24,6 +24,7 @@ import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.api.trace.Tracer;
import java.util.function.Consumer;
import org.apache.pulsar.PulsarVersion;
@@ -32,6 +33,7 @@ public class InstrumentProvider {
public static final InstrumentProvider NOOP = new
InstrumentProvider(OpenTelemetry.noop());
private final Meter meter;
+ private final Tracer tracer;
public InstrumentProvider(OpenTelemetry otel) {
if (otel == null) {
@@ -43,6 +45,10 @@ public class InstrumentProvider {
.meterBuilder("org.apache.pulsar.client")
.setInstrumentationVersion(PulsarVersion.getVersion())
.build();
+ this.tracer = otel.getTracerProvider()
+ .tracerBuilder("org.apache.pulsar.client")
+ .setInstrumentationVersion(PulsarVersion.getVersion())
+ .build();
}
public Counter newCounter(String name, Unit unit, String description,
String topic, Attributes attributes) {
@@ -63,4 +69,8 @@ public class InstrumentProvider {
Consumer<ObservableLongMeasurement> callback) {
return new ObservableUpDownCounter(meter, name, unit, description,
topic, attributes, callback);
}
+
+ public Tracer getTracer() {
+ return tracer;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
new file mode 100644
index 00000000000..eece10b76ff
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryConsumerInterceptor.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicMessageId;
+import org.apache.pulsar.client.api.TraceableMessageId;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OpenTelemetry consumer interceptor that creates spans for message
consumption.
+ * <p>
+ * This interceptor automatically retrieves the Tracer from the client's
InstrumentProvider,
+ * ensuring consistent OpenTelemetry configuration across the client.
+ * <p>
+ * <b>Span Storage Strategy:</b>
+ * <ul>
+ * <li><b>Shared/Key_Shared subscriptions:</b> Spans are attached directly
to {@link TraceableMessageId}
+ * instances with zero map overhead.</li>
+ * <li><b>Failover/Exclusive subscriptions:</b> A nested map is initialized
eagerly to track message IDs
+ * and their spans in sorted order. This is necessary because cumulative
ack must end spans
+ * for all messages up to the acked position.</li>
+ * </ul>
+ * <p>
+ * <b>Multi-Topic Consumer Support:</b><br>
+ * For {@link org.apache.pulsar.client.impl.MultiTopicsConsumerImpl} and
pattern-based consumers, cumulative
+ * acknowledgment only affects messages from the same topic partition. The
interceptor uses a nested
+ * map structure (topic partition → message IDs) and {@link
TopicMessageId#getOwnerTopic()} to ensure
+ * spans are only ended for messages from the acknowledged topic partition.
+ */
+public class OpenTelemetryConsumerInterceptor<T> implements
ConsumerInterceptor<T> {
+
+ private static final Logger log =
LoggerFactory.getLogger(OpenTelemetryConsumerInterceptor.class);
+
+ private Tracer tracer;
+ private TextMapPropagator propagator;
+ private String topic;
+ private String subscription;
+ private boolean initialized = false;
+
+ /**
+ * Used for cumulative acknowledgment support (Failover/Exclusive
subscriptions).
+ * Outer map: topic partition -> (message ID -> span)
+ * Inner ConcurrentSkipListMap maintains sorted order for efficient range
operations.
+ * Initialized eagerly for Failover/Exclusive subscriptions.
+ * <p>
+ * The nested structure is necessary for multi-topic consumers where a
single interceptor
+ * instance handles messages from multiple topic partitions. Cumulative
ack only affects
+ * messages from the same topic partition.
+ */
+ private volatile Map<String, ConcurrentSkipListMap<MessageIdAdv, Span>>
messageSpansByTopic;
+
+ public OpenTelemetryConsumerInterceptor() {
+ // Tracer and propagator will be initialized in beforeConsume when we
have access to the consumer
+ }
+
+ /**
+ * Get the topic key for a message ID.
+ * For TopicMessageId, returns the owner topic. Otherwise returns the
consumer's topic.
+ */
+ private String getTopicKey(MessageId messageId) {
+ if (messageId instanceof TopicMessageId) {
+ return ((TopicMessageId) messageId).getOwnerTopic();
+ }
+ return topic != null ? topic : "";
+ }
+
+ /**
+ * Initialize the tracer from the consumer's client.
+ * This is called lazily on the first message.
+ */
+ private void initializeIfNeeded(Consumer<T> consumer) {
+ if (!initialized && consumer instanceof ConsumerBase<?> consumerBase) {
+ PulsarClientImpl client = consumerBase.getClient();
+ InstrumentProvider instrumentProvider =
client.instrumentProvider();
+
+ this.tracer = instrumentProvider.getTracer();
+ this.propagator =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+ this.initialized = true;
+ if (consumerBase.getConf().getSubscriptionType() ==
SubscriptionType.Exclusive
+ || consumerBase.getConf().getSubscriptionType() ==
SubscriptionType.Failover) {
+ ensureMapInitialized();
+ }
+ }
+ }
+
+ /**
+ * Ensure the map is initialized for cumulative acknowledgment support.
+ * This is called when we detect cumulative ack is being used.
+ */
+ private void ensureMapInitialized() {
+ if (messageSpansByTopic == null) {
+ messageSpansByTopic = new ConcurrentHashMap<>();
+ log.debug("Initialized message spans map for cumulative
acknowledgment support");
+ }
+ }
+
+ @Override
+ public void close() {
+ // Clean up any remaining spans for Failover/Exclusive subscriptions
+ if (messageSpansByTopic != null) {
+ messageSpansByTopic.values().forEach(topicSpans ->
+ topicSpans.values().forEach(TracingContext::endSpan)
+ );
+ messageSpansByTopic.clear();
+ }
+ }
+
+ @Override
+ public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
+ // Initialize tracer from consumer on first call
+ initializeIfNeeded(consumer);
+
+ if (tracer == null || propagator == null) {
+ return message;
+ }
+
+ try {
+ if (topic == null) {
+ topic = consumer.getTopic();
+ }
+ if (subscription == null) {
+ subscription = consumer.getSubscription();
+ }
+
+ // Create a consumer span for this message
+ Span span = TracingContext.createConsumerSpan(tracer, topic,
subscription, message, propagator);
+
+ if (TracingContext.isValid(span)) {
+ MessageId messageId = message.getMessageId();
+
+ // Store in map for cumulative ack support (Failover/Exclusive)
+ if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ String topicKey = getTopicKey(messageId);
+ messageSpansByTopic.computeIfAbsent(topicKey,
+ k -> new
ConcurrentSkipListMap<>()).put((MessageIdAdv) messageId, span);
+ }
+
+ // Always attach span to message ID for individual ack/nack
+ if (messageId instanceof TraceableMessageId) {
+ ((TraceableMessageId) messageId).setTracingSpan(span);
+ }
+
+ log.debug("Created consumer span for message {} on topic {}",
messageId, topic);
+ }
+ } catch (Exception e) {
+ log.error("Error creating consumer span", e);
+ }
+
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<T> consumer, MessageId messageId,
Throwable exception) {
+ if (!(messageId instanceof TraceableMessageId)) {
+ return;
+ }
+
+ Span span = ((TraceableMessageId) messageId).getTracingSpan();
+ if (span != null) {
+ try {
+ if (exception != null) {
+ TracingContext.endSpan(span, exception);
+ } else {
+ // Add attribute to indicate acknowledgment type
+ span.setAttribute("messaging.pulsar.acknowledgment.type",
"acknowledge");
+ TracingContext.endSpan(span);
+ }
+ // Clear the span from the message ID
+ ((TraceableMessageId) messageId).setTracingSpan(null);
+
+ // Remove from map if it exists (Failover/Exclusive)
+ if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ String topicKey = getTopicKey(messageId);
+ ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
+ if (topicSpans != null) {
+ topicSpans.remove((MessageIdAdv) messageId);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error ending consumer span on acknowledge", e);
+ }
+ }
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId
messageId, Throwable exception) {
+ if (!(messageId instanceof MessageIdAdv cumulativeAckPos)) {
+ // Fallback to simple ack for non-adv message IDs
+ if (messageId instanceof TraceableMessageId) {
+ Span span = ((TraceableMessageId) messageId).getTracingSpan();
+ if (span != null) {
+ try {
+ if (exception != null) {
+ TracingContext.endSpan(span, exception);
+ } else {
+ // Add attribute to indicate acknowledgment type
+
span.setAttribute("messaging.pulsar.acknowledgment.type",
"cumulative_acknowledge");
+ TracingContext.endSpan(span);
+ }
+ ((TraceableMessageId) messageId).setTracingSpan(null);
+ } catch (Exception e) {
+ log.error("Error ending consumer span on cumulative
acknowledge", e);
+ }
+ }
+ }
+ return;
+ }
+
+ String topicKey = getTopicKey(messageId);
+
+ // Get the topic-specific map
+ ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic != null
+ ? messageSpansByTopic.get(topicKey) : null;
+
+ // First, try to get the span for the cumulative ack position itself
+ Span currentSpan = null;
+ if (messageId instanceof TraceableMessageId) {
+ currentSpan = ((TraceableMessageId) messageId).getTracingSpan();
+ }
+
+ // End spans for all messages in the topic-specific map up to the
cumulative ack position
+ if (topicSpans != null) {
+ Iterator<Map.Entry<MessageIdAdv, Span>> iterator =
topicSpans.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<MessageIdAdv, Span> entry = iterator.next();
+ MessageIdAdv msgId = entry.getKey();
+
+ // End spans for all messages <= cumulative ack position
+ if (msgId.compareTo(cumulativeAckPos) <= 0) {
+ Span span = entry.getValue();
+ try {
+ if (exception != null) {
+ TracingContext.endSpan(span, exception);
+ } else {
+ // Add attribute to indicate acknowledgment type
+
span.setAttribute("messaging.pulsar.acknowledgment.type",
"cumulative_acknowledge");
+ TracingContext.endSpan(span);
+ }
+
+ // Clear the span from the message ID
+ if (msgId instanceof TraceableMessageId) {
+ ((TraceableMessageId) msgId).setTracingSpan(null);
+ }
+ } catch (Exception e) {
+ log.error("Error ending consumer span on cumulative
acknowledge for message {}", msgId, e);
+ }
+ iterator.remove();
+ } else {
+ // Since the map is sorted, we can break early
+ break;
+ }
+ }
+
+ // Clean up empty topic map
+ if (topicSpans.isEmpty()) {
+ messageSpansByTopic.remove(topicKey);
+ }
+ }
+
+ // If the cumulative ack position span wasn't in the map, end it
directly
+ if (currentSpan != null && messageId instanceof TraceableMessageId) {
+ try {
+ if (exception != null) {
+ TracingContext.endSpan(currentSpan, exception);
+ } else {
+ TracingContext.endSpan(currentSpan);
+ }
+ ((TraceableMessageId) messageId).setTracingSpan(null);
+ } catch (Exception e) {
+ log.error("Error ending consumer span on cumulative
acknowledge", e);
+ }
+ }
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId>
messageIds) {
+ for (MessageId messageId : messageIds) {
+ if (!(messageId instanceof TraceableMessageId)) {
+ continue;
+ }
+
+ Span span = ((TraceableMessageId) messageId).getTracingSpan();
+ if (span != null) {
+ try {
+ // Add attribute to indicate negative acknowledgment (not
an error, but normal flow)
+ span.setAttribute("messaging.pulsar.acknowledgment.type",
"negative_acknowledge");
+ // End span normally - negative ack is expected behavior,
not an error
+ TracingContext.endSpan(span);
+ // Clear the span from the message ID
+ ((TraceableMessageId) messageId).setTracingSpan(null);
+
+ // Remove from map if it exists (Failover/Exclusive)
+ if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ String topicKey = getTopicKey(messageId);
+ ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
+ if (topicSpans != null) {
+ topicSpans.remove((MessageIdAdv) messageId);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error ending consumer span on negative
acknowledge", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId>
messageIds) {
+ for (MessageId messageId : messageIds) {
+ if (!(messageId instanceof TraceableMessageId)) {
+ continue;
+ }
+
+ Span span = ((TraceableMessageId) messageId).getTracingSpan();
+ if (span != null) {
+ try {
+ // Add attribute to indicate ack timeout (not an error,
but expected behavior)
+ span.setAttribute("messaging.pulsar.acknowledgment.type",
"ack_timeout");
+ // End span normally - ack timeout is expected behavior,
not an error
+ TracingContext.endSpan(span);
+ // Clear the span from the message ID
+ ((TraceableMessageId) messageId).setTracingSpan(null);
+
+ // Remove from map if it exists (Failover/Exclusive)
+ if (messageSpansByTopic != null && messageId instanceof
MessageIdAdv) {
+ String topicKey = getTopicKey(messageId);
+ ConcurrentSkipListMap<MessageIdAdv, Span> topicSpans =
messageSpansByTopic.get(topicKey);
+ if (topicSpans != null) {
+ topicSpans.remove((MessageIdAdv) messageId);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error ending consumer span on ack timeout", e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
new file mode 100644
index 00000000000..1dc36566e6e
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TraceableMessage;
+import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
+import org.apache.pulsar.client.impl.ProducerBase;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OpenTelemetry producer interceptor that creates spans for message
publishing.
+ * <p>
+ * This interceptor automatically retrieves the Tracer from the client's
InstrumentProvider,
+ * ensuring consistent OpenTelemetry configuration across the client.
+ * <p>
+ * Spans are attached directly to {@link TraceableMessage} instances,
eliminating the need
+ * for external span tracking via maps.
+ */
+public class OpenTelemetryProducerInterceptor implements ProducerInterceptor {
+
+ private static final Logger log =
LoggerFactory.getLogger(OpenTelemetryProducerInterceptor.class);
+
+ private Tracer tracer;
+ private TextMapPropagator propagator;
+ private String topic;
+ private boolean initialized = false;
+
+ public OpenTelemetryProducerInterceptor() {
+ // Tracer and propagator will be initialized in beforeSend when we
have access to the producer
+ }
+
+ /**
+ * Initialize the tracer from the producer's client.
+ * This is called lazily on the first message.
+ */
+ private void initializeIfNeeded(Producer producer) {
+ if (!initialized && producer instanceof ProducerBase<?> producerBase) {
+ PulsarClientImpl client = producerBase.getClient();
+ InstrumentProvider instrumentProvider =
client.instrumentProvider();
+
+ this.tracer = instrumentProvider.getTracer();
+ this.propagator =
GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+ this.initialized = true;
+ }
+ }
+
+ @Override
+ public void close() {
+ // Producer will fail pending messages when it being closed,
+ // which will trigger the `onSendAcknowledgement` events
+ }
+
+ @Override
+ public boolean eligible(Message message) {
+ return tracer != null && propagator != null;
+ }
+
+ @Override
+ public Message beforeSend(Producer producer, Message message) {
+ // Initialize tracer from producer on first call
+ initializeIfNeeded(producer);
+
+ if (!eligible(message)) {
+ return message;
+ }
+
+ try {
+ if (topic == null) {
+ topic = producer.getTopic();
+ }
+
+ // Create a span for this message publication
+ // The span will be linked to the current context, which may have
been set by:
+ // 1. An active span in the current thread (e.g., from HTTP
request handling)
+ // 2. Context propagated from upstream services
+ Span span = TracingContext.createProducerSpan(tracer, topic,
Context.current());
+
+ if (TracingContext.isValid(span) && message instanceof
TraceableMessage) {
+ // Attach the span directly to the message
+ ((TraceableMessage) message).setTracingSpan(span);
+ log.debug("Created producer span for message on topic {}",
topic);
+ }
+ } catch (Exception e) {
+ log.error("Error creating producer span", e);
+ }
+
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer producer, Message message,
MessageId msgId, Throwable exception) {
+ if (!(message instanceof TraceableMessage)) {
+ return;
+ }
+
+ Span span = ((TraceableMessage) message).getTracingSpan();
+ if (span != null) {
+ try {
+ if (msgId != null) {
+ span.setAttribute("messaging.message.id",
msgId.toString());
+ }
+
+ if (exception != null) {
+ TracingContext.endSpan(span, exception);
+ } else {
+ TracingContext.endSpan(span);
+ }
+
+ // Clear the span from the message
+ ((TraceableMessage) message).setTracingSpan(null);
+ } catch (Exception e) {
+ log.error("Error ending producer span", e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
new file mode 100644
index 00000000000..514d02a64e7
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/TracingContext.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Utility class for managing OpenTelemetry tracing context in Pulsar messages.
+ */
+public class TracingContext {
+
+ private static final TextMapGetter<Map<String, String>> GETTER = new
TextMapGetter<Map<String, String>>() {
+ @Override
+ public Iterable<String> keys(Map<String, String> carrier) {
+ return carrier.keySet();
+ }
+
+ @Nullable
+ @Override
+ public String get(@Nullable Map<String, String> carrier, String key) {
+ return carrier != null ? carrier.get(key) : null;
+ }
+ };
+
+ private static final TextMapSetter<Map<String, String>> SETTER = (carrier,
key, value) -> {
+ if (carrier != null) {
+ carrier.put(key, value);
+ }
+ };
+
+ /**
+ * Extract trace context from message properties.
+ *
+ * @param message the message to extract context from
+ * @param propagator the text map propagator to use
+ * @return the extracted context
+ */
+ public static Context extractContext(Message<?> message, TextMapPropagator
propagator) {
+ if (message == null || propagator == null) {
+ return Context.current();
+ }
+ return propagator.extract(Context.current(), message.getProperties(),
GETTER);
+ }
+
+ /**
+ * Inject trace context into message properties.
+ *
+ * @param messageBuilder the message builder to inject context into
+ * @param context the context to inject
+ * @param propagator the text map propagator to use
+ */
+ public static <T> void injectContext(TypedMessageBuilder<T>
messageBuilder, Context context,
+ TextMapPropagator propagator) {
+ if (messageBuilder == null || context == null || propagator == null) {
+ return;
+ }
+
+ Map<String, String> carrier = new HashMap<>();
+ propagator.inject(context, carrier, SETTER);
+
+ for (Map.Entry<String, String> entry : carrier.entrySet()) {
+ messageBuilder.property(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Create a producer span for message publishing.
+ *
+ * @param tracer the tracer to use
+ * @param topic the topic name
+ * @param parentContext the parent context (may be null)
+ * @return the created span
+ */
+ public static Span createProducerSpan(Tracer tracer, String topic,
@Nullable Context parentContext) {
+ if (tracer == null) {
+ return Span.getInvalid();
+ }
+
+ Context context = parentContext != null ? parentContext :
Context.current();
+ return tracer.spanBuilder("send " + topic)
+ .setParent(context)
+ .setSpanKind(SpanKind.PRODUCER)
+ .setAttribute("messaging.system", "pulsar")
+ .setAttribute("messaging.destination.name", topic)
+ .setAttribute("messaging.operation.name", "send")
+ .startSpan();
+ }
+
+ /**
+ * Create a consumer span for message consumption.
+ *
+ * @param tracer the tracer to use
+ * @param topic the topic name
+ * @param subscription the subscription name
+ * @param message the message being consumed
+ * @param propagator the text map propagator to use for context extraction
+ * @return the created span
+ */
+ public static Span createConsumerSpan(Tracer tracer, String topic, String
subscription, Message<?> message,
+ TextMapPropagator propagator) {
+ if (tracer == null) {
+ return Span.getInvalid();
+ }
+
+ Context parentContext = extractContext(message, propagator);
+
+ return tracer.spanBuilder("process " + topic)
+ .setParent(parentContext)
+ .setSpanKind(SpanKind.CONSUMER)
+ .setAttribute("messaging.system", "pulsar")
+ .setAttribute("messaging.destination.name", topic)
+ .setAttribute("messaging.destination.subscription.name",
subscription)
+ .setAttribute("messaging.operation.name", "process")
+ .setAttribute("messaging.message.id",
message.getMessageId().toString())
+ .startSpan();
+ }
+
+ /**
+ * Mark a span as successful and end it.
+ *
+ * @param span the span to end
+ */
+ public static void endSpan(Span span) {
+ if (span != null) {
+ span.setStatus(StatusCode.OK);
+ span.end();
+ }
+ }
+
+ /**
+ * Mark a span as failed with an exception and end it.
+ *
+ * @param span the span to end
+ * @param throwable the exception that caused the failure
+ */
+ public static void endSpan(Span span, Throwable throwable) {
+ if (span != null) {
+ span.setStatus(StatusCode.ERROR, throwable.getMessage());
+ if (span.isRecording()) {
+ span.recordException(throwable);
+ }
+ span.end();
+ }
+ }
+
+ /**
+ * Check if a span has a valid context.
+ *
+ * @param span the span to check
+ * @return true if the span has a valid context
+ */
+ public static boolean isValid(Span span) {
+ return span != null && span.getSpanContext() != null &&
span.getSpanContext().isValid();
+ }
+
+ /**
+ * Get the span context from a span.
+ *
+ * @param span the span
+ * @return the span context
+ */
+ public static SpanContext getSpanContext(Span span) {
+ return span != null ? span.getSpanContext() : SpanContext.getInvalid();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java
new file mode 100644
index 00000000000..73d650339cb
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/package-info.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * OpenTelemetry tracing support for Pulsar Java client.
+ *
+ * <h2>Overview</h2>
+ * This package provides OpenTelemetry distributed tracing capabilities for
Pulsar producers and consumers.
+ * It automatically creates spans for message publishing, consumption, and
acknowledgment operations,
+ * and propagates trace context across services using message properties.
+ *
+ * <h2>Producer Tracing</h2>
+ * Producer tracing tracks:
+ * <ul>
+ * <li><b>publish</b> - Span created when send is called</li>
+ * <li><b>published</b> - Span completed when broker confirms receipt</li>
+ * </ul>
+ *
+ * <h3>Basic Producer Example</h3>
+ * <pre>{@code
+ * // Configure OpenTelemetry (or use auto-instrumentation)
+ * OpenTelemetry openTelemetry = ...;
+ *
+ * // Create producer with tracing interceptor
+ * Producer<String> producer = client.newProducer(Schema.STRING)
+ * .topic("my-topic")
+ * .intercept(new OpenTelemetryProducerInterceptor())
+ * .create();
+ *
+ * // Send message - trace context is automatically propagated
+ * producer.newMessage()
+ * .value("Hello World")
+ * .send();
+ * }</pre>
+ *
+ * <p>
+ * Trace context is automatically injected into message properties from the
current thread's context.
+ * This means if your code is running within a traced HTTP request or any
other active span,
+ * the trace will automatically continue through Pulsar messages.
+ *
+ * <h2>Consumer Tracing</h2>
+ * Consumer tracing tracks:
+ * <ul>
+ * <li><b>consume</b> - Span created when message is received</li>
+ * <li><b>ack</b> - Span completed when message is acknowledged</li>
+ * <li><b>nack</b> - Span completed when message is negatively
acknowledged</li>
+ * </ul>
+ *
+ * <h3>Basic Consumer Example</h3>
+ * <pre>{@code
+ * // Create consumer with tracing interceptor
+ * Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ * .topic("my-topic")
+ * .subscriptionName("my-subscription")
+ * .intercept(new OpenTelemetryConsumerInterceptor<>())
+ * .subscribe();
+ *
+ * // Receive and process messages - trace context is automatically extracted
+ * while (true) {
+ * Message<String> msg = consumer.receive();
+ * try {
+ * // Process message
+ * System.out.println("Received: " + msg.getValue());
+ * consumer.acknowledge(msg);
+ * } catch (Exception e) {
+ * consumer.negativeAcknowledge(msg);
+ * }
+ * }
+ * }</pre>
+ *
+ * <h2>End-to-End Tracing Example</h2>
+ * <pre>{@code
+ * // Service 1: HTTP endpoint that publishes to Pulsar
+ * // When using auto-instrumentation or OpenTelemetry SDK, the HTTP request
+ * // will have an active span context that automatically propagates to Pulsar
+ * @POST
+ * @Path("/publish")
+ * public Response publishMessage(String body) {
+ * // Send message - trace context automatically injected!
+ * producer.newMessage()
+ * .value(body)
+ * .send();
+ *
+ * return Response.ok().build();
+ * }
+ *
+ * // Service 2: Consumer that processes messages
+ * Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ * .topic("my-topic")
+ * .subscriptionName("my-subscription")
+ * .intercept(new OpenTelemetryConsumerInterceptor<>())
+ * .subscribe();
+ *
+ * // Process messages - trace continues from HTTP request
+ * Message<String> msg = consumer.receive();
+ * // Trace context is automatically extracted from message properties
+ * processMessage(msg.getValue());
+ * consumer.acknowledge(msg);
+ *
+ * // The entire flow from HTTP request -> Producer -> Consumer is now traced!
+ * }</pre>
+ *
+ * <h2>Configuration</h2>
+ * OpenTelemetry can be configured via:
+ * <ul>
+ * <li>Java Agent auto-instrumentation</li>
+ * <li>Environment variables (OTEL_*)</li>
+ * <li>System properties (otel.*)</li>
+ * <li>Programmatic configuration</li>
+ * </ul>
+ *
+ * <h2>Trace Context Propagation</h2>
+ * Trace context is propagated using W3C TraceContext format via message
properties:
+ * <ul>
+ * <li><b>traceparent</b> - Contains trace ID, span ID, and trace flags</li>
+ * <li><b>tracestate</b> - Contains vendor-specific trace information</li>
+ * </ul>
+ *
+ * @see OpenTelemetryProducerInterceptor
+ * @see OpenTelemetryConsumerInterceptor
+ * @see TracingContext
+ */
+package org.apache.pulsar.client.impl.tracing;
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index 8b031fbd38b..0ca83d50a96 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -101,9 +103,9 @@ public class ConsumerBuilderImplTest {
@Test
public void testConsumerBuilderImpl() throws PulsarClientException {
Consumer consumer = mock(Consumer.class);
- when(consumerBuilderImpl.subscribeAsync())
- .thenReturn(CompletableFuture.completedFuture(consumer));
- assertNotNull(consumerBuilderImpl.topic(TOPIC_NAME).subscribe());
+ ConsumerBuilderImpl spyBuilder = spy(consumerBuilderImpl);
+
doReturn(CompletableFuture.completedFuture(consumer)).when(spyBuilder).subscribeAsync();
+ assertNotNull(spyBuilder.topic(TOPIC_NAME).subscribe());
}
@Test(expectedExceptions = IllegalArgumentException.class)
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
index bd96cf27aee..45e31e00b4c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.testng.annotations.BeforeClass;
@@ -54,6 +55,7 @@ public class ProducerBuilderImplTest {
client = mock(PulsarClientImpl.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(client.getCnxPool()).thenReturn(connectionPool);
+ when(client.getConfiguration()).thenReturn(new
ClientConfigurationData());
producerBuilderImpl = new ProducerBuilderImpl<>(client, Schema.BYTES);
when(client.newProducer()).thenReturn(producerBuilderImpl);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java
new file mode 100644
index 00000000000..bbf92b9ffb1
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryTracingTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for OpenTelemetry tracing integration.
+ */
+public class OpenTelemetryTracingTest {
+
+ private InMemorySpanExporter spanExporter;
+ private OpenTelemetrySdk openTelemetry;
+ private Tracer tracer;
+ private TextMapPropagator propagator;
+
+ @BeforeClass
+ public void setup() {
+ spanExporter = InMemorySpanExporter.create();
+ SdkTracerProvider tracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
+ .build();
+
+ openTelemetry = OpenTelemetrySdk.builder()
+ .setTracerProvider(tracerProvider)
+ .build();
+
+ tracer = openTelemetry.getTracer("test-tracer");
+ propagator = openTelemetry.getPropagators().getTextMapPropagator();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ if (openTelemetry != null) {
+ openTelemetry.close();
+ }
+ }
+
+ @Test
+ public void testCreateProducerSpan() {
+ spanExporter.reset();
+
+ String topic = "test-topic";
+ Span span = TracingContext.createProducerSpan(tracer, topic, null);
+
+ assertNotNull(span);
+ assertTrue(span.isRecording());
+ assertTrue(TracingContext.isValid(span));
+
+ TracingContext.endSpan(span);
+
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertEquals(spans.size(), 1);
+
+ SpanData spanData = spans.get(0);
+ assertEquals(spanData.getName(), "send " + topic);
+ assertEquals(spanData.getKind(), SpanKind.PRODUCER);
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")),
topic);
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation.name")),
"send");
+ }
+
+ @Test
+ public void testCreateConsumerSpan() {
+ spanExporter.reset();
+
+ String topic = "test-topic";
+ String subscription = "test-sub";
+ Map<String, String> properties = new HashMap<>();
+ properties.put("test-key", "test-value");
+
+ Message<?> message = createTestMessage(properties);
+
+ Span span = TracingContext.createConsumerSpan(tracer, topic,
subscription, message, propagator);
+
+ assertNotNull(span);
+ assertTrue(span.isRecording());
+ assertTrue(TracingContext.isValid(span));
+
+ TracingContext.endSpan(span);
+
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertEquals(spans.size(), 1);
+
+ SpanData spanData = spans.get(0);
+ assertEquals(spanData.getName(), "process " + topic);
+ assertEquals(spanData.getKind(), SpanKind.CONSUMER);
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")),
"pulsar");
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.name")),
topic);
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.destination.subscription.name")),
+ subscription);
+ assertEquals(spanData.getAttributes().get(
+
io.opentelemetry.api.common.AttributeKey.stringKey("messaging.operation.name")),
"process");
+ }
+
+ @Test
+ public void testSpanWithException() {
+ spanExporter.reset();
+
+ String topic = "test-topic";
+ Span span = TracingContext.createProducerSpan(tracer, topic, null);
+
+ RuntimeException exception = new RuntimeException("Test exception");
+ TracingContext.endSpan(span, exception);
+
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertEquals(spans.size(), 1);
+
+ SpanData spanData = spans.get(0);
+ assertEquals(spanData.getStatus().getStatusCode(),
io.opentelemetry.api.trace.StatusCode.ERROR);
+ assertFalse(spanData.getEvents().isEmpty());
+ }
+
+ @Test
+ public void testContextPropagation() {
+ spanExporter.reset();
+
+ // Create a parent span
+ Span parentSpan = tracer.spanBuilder("parent").startSpan();
+ try (Scope scope = parentSpan.makeCurrent()) {
+ // Create a producer span as child
+ String topic = "test-topic";
+ Span producerSpan = TracingContext.createProducerSpan(tracer,
topic, Context.current());
+
+ assertNotNull(producerSpan);
+ assertTrue(TracingContext.isValid(producerSpan));
+
+ TracingContext.endSpan(producerSpan);
+ } finally {
+ parentSpan.end();
+ }
+
+ List<SpanData> spans = spanExporter.getFinishedSpanItems();
+ assertEquals(spans.size(), 2);
+
+ // Verify parent-child relationship
+ SpanData producerSpan = spans.get(0);
+ SpanData parentSpanData = spans.get(1);
+
+ assertEquals(producerSpan.getParentSpanId(),
parentSpanData.getSpanId());
+ }
+
+
+ private Message<?> createTestMessage(Map<String, String> properties) {
+ // Create a simple MessageMetadata with properties
+ org.apache.pulsar.common.api.proto.MessageMetadata metadata =
+ new org.apache.pulsar.common.api.proto.MessageMetadata();
+
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ metadata.addProperty()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue());
+ }
+
+ // Create a message with metadata
+ MessageImpl<?> message = MessageImpl.create(
+ metadata,
+ java.nio.ByteBuffer.wrap("test".getBytes()),
+ org.apache.pulsar.client.api.Schema.BYTES,
+ "test-topic"
+ );
+
+ // Set a MessageId on the message
+ message.setMessageId(new
org.apache.pulsar.client.impl.MessageIdImpl(1L, 1L, -1));
+
+ return message;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java
new file mode 100644
index 00000000000..d50c45a1df5
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/tracing/TracingExampleTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.tracing;
+
+import org.testng.annotations.Test;
+
+/**
+ * Example test demonstrating OpenTelemetry tracing usage patterns.
+ * These are code examples for documentation purposes.
+ */
+public class TracingExampleTest {
+
+ /**
+ * Example 1: Basic producer with tracing.
+ */
+ @Test(enabled = false)
+ public void exampleBasicProducerTracing() throws Exception {
+ // Configure OpenTelemetry (or use auto-instrumentation)
+ // OpenTelemetry openTelemetry = ...;
+
+ // Create Pulsar client
+ // PulsarClient client = PulsarClient.builder()
+ // .serviceUrl("pulsar://localhost:6650")
+ // .build();
+
+ // Create producer with tracing interceptor
+ // Producer<String> producer = client.newProducer(Schema.STRING)
+ // .topic("my-topic")
+ // .intercept(new OpenTelemetryProducerInterceptor())
+ // .create();
+
+ // Send message - trace context is automatically propagated
+ // producer.newMessage()
+ // .value("Hello World")
+ // .send();
+ }
+
+ /**
+ * Example 2: Producer with automatic tracing.
+ */
+ @Test(enabled = false)
+ public void exampleProducerWithAutomaticTracing() throws Exception {
+ // Create Pulsar client with tracing enabled
+ // PulsarClient client = PulsarClient.builder()
+ // .serviceUrl("pulsar://localhost:6650")
+ // .openTelemetry(openTelemetry, true) // Enable automatic tracing
+ // .build();
+
+ // Producer automatically has tracing enabled
+ // Producer<String> producer = client.newProducer(Schema.STRING)
+ // .topic("my-topic")
+ // .create();
+
+ // Send message - trace context is automatically injected
+ // producer.newMessage()
+ // .value("Hello World")
+ // .send();
+ }
+
+ /**
+ * Example 3: Basic consumer with tracing.
+ */
+ @Test(enabled = false)
+ public void exampleBasicConsumerTracing() throws Exception {
+ // Create Pulsar client
+ // PulsarClient client = PulsarClient.builder()
+ // .serviceUrl("pulsar://localhost:6650")
+ // .build();
+
+ // Create consumer with tracing interceptor
+ // Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ // .topic("my-topic")
+ // .subscriptionName("my-subscription")
+ // .intercept(new OpenTelemetryConsumerInterceptor<>())
+ // .subscribe();
+
+ // Receive and process messages - trace context is automatically
extracted
+ // while (true) {
+ // Message<String> msg = consumer.receive();
+ // try {
+ // // Process message
+ // System.out.println("Received: " + msg.getValue());
+ // consumer.acknowledge(msg);
+ // } catch (Exception e) {
+ // consumer.negativeAcknowledge(msg);
+ // }
+ // }
+ }
+
+ /**
+ * Example 4: End-to-end tracing from HTTP to Pulsar (automatic).
+ */
+ @Test(enabled = false)
+ public void exampleEndToEndTracing() throws Exception {
+ // ===== HTTP Service =====
+ // When the HTTP framework has OpenTelemetry instrumentation,
+ // the active span context automatically propagates to Pulsar
+
+ // Producer - trace context automatically injected from HTTP span
+ // producer.newMessage()
+ // .value("Message from HTTP request")
+ // .send();
+
+ // ===== Consumer Service =====
+ // In another service/application
+
+ // Consumer with tracing
+ // Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ // .topic("my-topic")
+ // .subscriptionName("my-subscription")
+ // .intercept(new OpenTelemetryConsumerInterceptor<>())
+ // .subscribe();
+
+ // Process messages - trace continues from HTTP request
+ // Message<String> msg = consumer.receive();
+ // // Trace context is automatically extracted from message properties
+ // processMessage(msg.getValue());
+ // consumer.acknowledge(msg);
+
+ // The entire flow from HTTP request -> Producer -> Consumer is now
traced!
+ }
+
+ /**
+ * Example 5: Custom span creation in message processing.
+ */
+ @Test(enabled = false)
+ public void exampleCustomSpanCreation() throws Exception {
+ // When you need to create custom spans during message processing
+
+ // Consumer with tracing
+ // Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ // .topic("my-topic")
+ // .subscriptionName("my-subscription")
+ // .intercept(new OpenTelemetryConsumerInterceptor<>())
+ // .subscribe();
+
+ // Get tracer
+ // Tracer tracer =
GlobalOpenTelemetry.get().getTracer("my-application");
+
+ // Process messages
+ // Message<String> msg = consumer.receive();
+
+ // The consumer interceptor already created a span, so we're in that
context
+ // Create a child span for custom processing
+ // Span processingSpan = tracer.spanBuilder("process-message")
+ // .setSpanKind(SpanKind.INTERNAL)
+ // .startSpan();
+
+ // try (Scope scope = processingSpan.makeCurrent()) {
+ // // Your processing logic here
+ // // Any spans created here will be children of processingSpan
+ // doSomeProcessing(msg.getValue());
+ // processingSpan.setStatus(StatusCode.OK);
+ // } catch (Exception e) {
+ // processingSpan.recordException(e);
+ // processingSpan.setStatus(StatusCode.ERROR);
+ // throw e;
+ // } finally {
+ // processingSpan.end();
+ // consumer.acknowledge(msg);
+ // }
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index cb4c93f153f..40876e57df9 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
@@ -103,6 +104,7 @@ public class ContextImplTest {
producer = mock(Producer.class);
client = mock(PulsarClientImpl.class);
+ when(client.getConfiguration()).thenReturn(new
ClientConfigurationData());
ConnectionPool connectionPool = mock(ConnectionPool.class);
when(client.getCnxPool()).thenReturn(connectionPool);
when(client.newProducer()).thenAnswer(invocation -> new
ProducerBuilderImpl(client, Schema.BYTES));