This is an automated email from the ASF dual-hosted git repository.
bvahdat pushed a commit to branch CAMEL-18055
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/CAMEL-18055 by this push:
new b7931e5f0de CAMEL-18055: Create tracing SpanDecorator for
ServiceBusComponent
b7931e5f0de is described below
commit b7931e5f0de1be0960ecdc2adc23f117324ec93d
Author: Babak Vahdat <[email protected]>
AuthorDate: Thu May 5 21:11:17 2022 +0200
CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent
---
.../org.apache.camel.tracing.SpanDecorator | 1 +
.../decorators/ServiceBusSpanDecorator.java | 108 +++++++++++++++++++++
.../decorators/ServiceBusSpanDecoratorTest.java | 81 ++++++++++++++++
3 files changed, 190 insertions(+)
diff --git
a/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
b/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
index 39b88a064e5..e575ef9978a 100644
---
a/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
+++
b/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
@@ -42,6 +42,7 @@ org.apache.camel.tracing.decorators.PlatformHttpSpanDecorator
org.apache.camel.tracing.decorators.RabbitmqSpanDecorator
org.apache.camel.tracing.decorators.RestSpanDecorator
org.apache.camel.tracing.decorators.SedaSpanDecorator
+org.apache.camel.tracing.decorators.ServiceBusSpanDecorator
org.apache.camel.tracing.decorators.ServletSpanDecorator
org.apache.camel.tracing.decorators.SjmsSpanDecorator
org.apache.camel.tracing.decorators.Sjms2SpanDecorator
diff --git
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
new file mode 100644
index 00000000000..5a38f4489a0
--- /dev/null
+++
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
@@ -0,0 +1,108 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.tracing.SpanAdapter;
+
+public class ServiceBusSpanDecorator extends AbstractMessagingSpanDecorator {
+
+ static final String SERVICEBUS_CONTENT_TYPE = "contentType";
+ static final String SERVICEBUS_CORRELATION_ID = "correlationId";
+ static final String SERVICEBUS_DELIVERY_COUNT = "deliveryCount";
+ static final String SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER =
"enqueuedSequenceNumber";
+ static final String SERVICEBUS_ENQUEUED_TIME = "enqueuedTime";
+ static final String SERVICEBUS_EXPIRES_AT = "expiresAt";
+ static final String SERVICEBUS_PARTITION_KEY = "partitionKey";
+ static final String SERVICEBUS_REPLY_TO_SESSION_ID = "replyToSessionId";
+ static final String SERVICEBUS_SESSION_ID = "sessionId";
+ static final String SERVICEBUS_TIME_TO_LIVE = "ttl";
+
+ /**
+ * Constants copied from {@link
org.apache.camel.component.azure.servicebus.ServiceBusConstants}
+ */
+ static final String CONTENT_TYPE = "CamelAzureServiceBusContentType";
+ static final String CORRELATION_ID = "CamelAzureServiceBusCorrelationId";
+ static final String DELIVERY_COUNT = "CamelAzureServiceBusDeliveryCount";
+ static final String ENQUEUED_SEQUENCE_NUMBER =
"CamelAzureServiceBusEnqueuedSequenceNumber";
+ static final String ENQUEUED_TIME = "CamelAzureServiceBusEnqueuedTime";
+ static final String EXPIRES_AT = "CamelAzureServiceBusExpiresAt";
+ static final String MESSAGE_ID = "CamelAzureServiceBusMessageId";
+ static final String SESSION_ID = "CamelAzureServiceBusSessionId";
+ static final String REPLY_TO_SESSION_ID =
"CamelAzureServiceBusReplyToSessionId";
+ static final String PARTITION_KEY = "CamelAzureServiceBusPartitionKey";
+ static final String TIME_TO_LIVE = "CamelAzureServiceBusTimeToLive";
+
+ @Override
+ public String getComponent() {
+ return "azure-servicebus";
+ }
+
+ @Override
+ public String getComponentClassName() {
+ return
"org.apache.camel.component.azure.servicebus.ServiceBusComponent";
+ }
+
+ @Override
+ public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) {
+ super.pre(span, exchange, endpoint);
+
+ String contentType = exchange.getIn().getHeader(CONTENT_TYPE,
String.class);
+ if (contentType != null) {
+ span.setTag(SERVICEBUS_CONTENT_TYPE, contentType);
+ }
+
+ String correlationId = exchange.getIn().getHeader(CORRELATION_ID,
String.class);
+ if (correlationId != null) {
+ span.setTag(SERVICEBUS_CORRELATION_ID, correlationId);
+ }
+
+ Long deliveryCount = exchange.getIn().getHeader(DELIVERY_COUNT,
Long.class);
+ if (deliveryCount != null) {
+ span.setTag(SERVICEBUS_DELIVERY_COUNT, deliveryCount);
+ }
+
+ OffsetDateTime enqueuedSequenceNumber =
exchange.getIn().getHeader(ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class);
+ if (enqueuedSequenceNumber != null) {
+ span.setTag(SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER,
enqueuedSequenceNumber.toString());
+ }
+
+ OffsetDateTime enqueuedTime =
exchange.getIn().getHeader(ENQUEUED_TIME, OffsetDateTime.class);
+ if (enqueuedTime != null) {
+ span.setTag(SERVICEBUS_ENQUEUED_TIME, enqueuedTime.toString());
+ }
+
+ OffsetDateTime expiresAt = exchange.getIn().getHeader(EXPIRES_AT,
OffsetDateTime.class);
+ if (expiresAt != null) {
+ span.setTag(SERVICEBUS_EXPIRES_AT, expiresAt.toString());
+ }
+
+ String partitionKey = exchange.getIn().getHeader(PARTITION_KEY,
String.class);
+ if (partitionKey != null) {
+ span.setTag(SERVICEBUS_PARTITION_KEY, partitionKey);
+ }
+
+ String replyToSessionId =
exchange.getIn().getHeader(REPLY_TO_SESSION_ID, String.class);
+ if (replyToSessionId != null) {
+ span.setTag(SERVICEBUS_REPLY_TO_SESSION_ID, replyToSessionId);
+ }
+
+ String sessionId = exchange.getIn().getHeader(SESSION_ID,
String.class);
+ if (sessionId != null) {
+ span.setTag(SERVICEBUS_SESSION_ID, sessionId);
+ }
+
+ Duration timeToLive = exchange.getIn().getHeader(TIME_TO_LIVE,
Duration.class);
+ if (timeToLive != null) {
+ span.setTag(SERVICEBUS_TIME_TO_LIVE, timeToLive.toString());
+ }
+ }
+
+ @Override
+ protected String getMessageId(Exchange exchange) {
+ return exchange.getIn().getHeader(MESSAGE_ID, String.class);
+ }
+
+}
diff --git
a/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
new file mode 100644
index 00000000000..3006673d7ca
--- /dev/null
+++
b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
@@ -0,0 +1,81 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.tracing.MockSpanAdapter;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ServiceBusSpanDecoratorTest {
+
+ @Test
+ public void testGetMessageId() {
+ String messageId = "abcd";
+ Exchange exchange = Mockito.mock(Exchange.class);
+ Message message = Mockito.mock(Message.class);
+
+ Mockito.when(exchange.getIn()).thenReturn(message);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.MESSAGE_ID,
String.class)).thenReturn(messageId);
+
+ AbstractMessagingSpanDecorator decorator = new
ServiceBusSpanDecorator();
+
+ assertEquals(messageId, decorator.getMessageId(exchange));
+ }
+
+ @Test
+ public void testPre() {
+ String contentType = "application/json";
+ String correlationId = "1234";
+ Long deliveryCount = 27L;
+ OffsetDateTime enqueuedSequenceNumber = OffsetDateTime.now();
+ OffsetDateTime enqueuedTime = OffsetDateTime.now();
+ OffsetDateTime expiresAt = OffsetDateTime.now();
+ String partitionKey = "MyPartitionKey";
+ String replyToSessionId = "MyReplyToSessionId";
+ String sessionId = "4321";
+ Duration ttl = Duration.ofDays(7);
+
+ Endpoint endpoint = Mockito.mock(Endpoint.class);
+ Exchange exchange = Mockito.mock(Exchange.class);
+ Message message = Mockito.mock(Message.class);
+
+
Mockito.when(endpoint.getEndpointUri()).thenReturn("azure-servicebus:topicOrQueueName");
+ Mockito.when(exchange.getIn()).thenReturn(message);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.CONTENT_TYPE,
String.class)).thenReturn(contentType);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.CORRELATION_ID,
String.class)).thenReturn(correlationId);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.DELIVERY_COUNT,
Long.class)).thenReturn(deliveryCount);
+
Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_SEQUENCE_NUMBER,
OffsetDateTime.class))
+ .thenReturn(enqueuedSequenceNumber);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_TIME,
OffsetDateTime.class)).thenReturn(enqueuedTime);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.EXPIRES_AT,
OffsetDateTime.class)).thenReturn(expiresAt);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.PARTITION_KEY,
String.class)).thenReturn(partitionKey);
+
Mockito.when(message.getHeader(ServiceBusSpanDecorator.REPLY_TO_SESSION_ID,
String.class)).thenReturn(replyToSessionId);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.SESSION_ID,
String.class)).thenReturn(sessionId);
+ Mockito.when(message.getHeader(ServiceBusSpanDecorator.TIME_TO_LIVE,
Duration.class)).thenReturn(ttl);
+
+ AbstractMessagingSpanDecorator decorator = new
ServiceBusSpanDecorator();
+
+ MockSpanAdapter span = new MockSpanAdapter();
+
+ decorator.pre(span, exchange, endpoint);
+
+ assertEquals(contentType,
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CONTENT_TYPE));
+ assertEquals(correlationId,
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CORRELATION_ID));
+ assertEquals(deliveryCount,
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_DELIVERY_COUNT));
+ assertEquals(enqueuedSequenceNumber.toString(),
+
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER));
+ assertEquals(enqueuedTime.toString(),
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_TIME));
+ assertEquals(expiresAt.toString(),
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_EXPIRES_AT));
+ assertEquals(partitionKey,
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_PARTITION_KEY));
+ assertEquals(replyToSessionId,
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_REPLY_TO_SESSION_ID));
+ assertEquals(sessionId,
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_SESSION_ID));
+ assertEquals(ttl.toString(),
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_TIME_TO_LIVE));
+ }
+
+}