This is an automated email from the ASF dual-hosted git repository.

bvahdat pushed a commit to branch CAMEL-18055
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/CAMEL-18055 by this push:
     new b7931e5f0de CAMEL-18055: Create tracing SpanDecorator for 
ServiceBusComponent
b7931e5f0de is described below

commit b7931e5f0de1be0960ecdc2adc23f117324ec93d
Author: Babak Vahdat <[email protected]>
AuthorDate: Thu May 5 21:11:17 2022 +0200

    CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent
---
 .../org.apache.camel.tracing.SpanDecorator         |   1 +
 .../decorators/ServiceBusSpanDecorator.java        | 108 +++++++++++++++++++++
 .../decorators/ServiceBusSpanDecoratorTest.java    |  81 ++++++++++++++++
 3 files changed, 190 insertions(+)

diff --git 
a/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
 
b/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
index 39b88a064e5..e575ef9978a 100644
--- 
a/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
+++ 
b/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator
@@ -42,6 +42,7 @@ org.apache.camel.tracing.decorators.PlatformHttpSpanDecorator
 org.apache.camel.tracing.decorators.RabbitmqSpanDecorator
 org.apache.camel.tracing.decorators.RestSpanDecorator
 org.apache.camel.tracing.decorators.SedaSpanDecorator
+org.apache.camel.tracing.decorators.ServiceBusSpanDecorator
 org.apache.camel.tracing.decorators.ServletSpanDecorator
 org.apache.camel.tracing.decorators.SjmsSpanDecorator
 org.apache.camel.tracing.decorators.Sjms2SpanDecorator
diff --git 
a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
 
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
new file mode 100644
index 00000000000..5a38f4489a0
--- /dev/null
+++ 
b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java
@@ -0,0 +1,108 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.tracing.SpanAdapter;
+
+public class ServiceBusSpanDecorator extends AbstractMessagingSpanDecorator {
+
+    static final String SERVICEBUS_CONTENT_TYPE = "contentType";
+    static final String SERVICEBUS_CORRELATION_ID = "correlationId";
+    static final String SERVICEBUS_DELIVERY_COUNT = "deliveryCount";
+    static final String SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER = 
"enqueuedSequenceNumber";
+    static final String SERVICEBUS_ENQUEUED_TIME = "enqueuedTime";
+    static final String SERVICEBUS_EXPIRES_AT = "expiresAt";
+    static final String SERVICEBUS_PARTITION_KEY = "partitionKey";
+    static final String SERVICEBUS_REPLY_TO_SESSION_ID = "replyToSessionId";
+    static final String SERVICEBUS_SESSION_ID = "sessionId";
+    static final String SERVICEBUS_TIME_TO_LIVE = "ttl";
+
+    /**
+     * Constants copied from {@link 
org.apache.camel.component.azure.servicebus.ServiceBusConstants}
+     */
+    static final String CONTENT_TYPE = "CamelAzureServiceBusContentType";
+    static final String CORRELATION_ID = "CamelAzureServiceBusCorrelationId";
+    static final String DELIVERY_COUNT = "CamelAzureServiceBusDeliveryCount";
+    static final String ENQUEUED_SEQUENCE_NUMBER = 
"CamelAzureServiceBusEnqueuedSequenceNumber";
+    static final String ENQUEUED_TIME = "CamelAzureServiceBusEnqueuedTime";
+    static final String EXPIRES_AT = "CamelAzureServiceBusExpiresAt";
+    static final String MESSAGE_ID = "CamelAzureServiceBusMessageId";
+    static final String SESSION_ID = "CamelAzureServiceBusSessionId";
+    static final String REPLY_TO_SESSION_ID = 
"CamelAzureServiceBusReplyToSessionId";
+    static final String PARTITION_KEY = "CamelAzureServiceBusPartitionKey";
+    static final String TIME_TO_LIVE = "CamelAzureServiceBusTimeToLive";
+
+    @Override
+    public String getComponent() {
+        return "azure-servicebus";
+    }
+
+    @Override
+    public String getComponentClassName() {
+        return 
"org.apache.camel.component.azure.servicebus.ServiceBusComponent";
+    }
+
+    @Override
+    public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) {
+        super.pre(span, exchange, endpoint);
+
+        String contentType = exchange.getIn().getHeader(CONTENT_TYPE, 
String.class);
+        if (contentType != null) {
+            span.setTag(SERVICEBUS_CONTENT_TYPE, contentType);
+        }
+
+        String correlationId = exchange.getIn().getHeader(CORRELATION_ID, 
String.class);
+        if (correlationId != null) {
+            span.setTag(SERVICEBUS_CORRELATION_ID, correlationId);
+        }
+
+        Long deliveryCount = exchange.getIn().getHeader(DELIVERY_COUNT, 
Long.class);
+        if (deliveryCount != null) {
+            span.setTag(SERVICEBUS_DELIVERY_COUNT, deliveryCount);
+        }
+
+        OffsetDateTime enqueuedSequenceNumber = 
exchange.getIn().getHeader(ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class);
+        if (enqueuedSequenceNumber != null) {
+            span.setTag(SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER, 
enqueuedSequenceNumber.toString());
+        }
+
+        OffsetDateTime enqueuedTime = 
exchange.getIn().getHeader(ENQUEUED_TIME, OffsetDateTime.class);
+        if (enqueuedTime != null) {
+            span.setTag(SERVICEBUS_ENQUEUED_TIME, enqueuedTime.toString());
+        }
+
+        OffsetDateTime expiresAt = exchange.getIn().getHeader(EXPIRES_AT, 
OffsetDateTime.class);
+        if (expiresAt != null) {
+            span.setTag(SERVICEBUS_EXPIRES_AT, expiresAt.toString());
+        }
+
+        String partitionKey = exchange.getIn().getHeader(PARTITION_KEY, 
String.class);
+        if (partitionKey != null) {
+            span.setTag(SERVICEBUS_PARTITION_KEY, partitionKey);
+        }
+
+        String replyToSessionId = 
exchange.getIn().getHeader(REPLY_TO_SESSION_ID, String.class);
+        if (replyToSessionId != null) {
+            span.setTag(SERVICEBUS_REPLY_TO_SESSION_ID, replyToSessionId);
+        }
+
+        String sessionId = exchange.getIn().getHeader(SESSION_ID, 
String.class);
+        if (sessionId != null) {
+            span.setTag(SERVICEBUS_SESSION_ID, sessionId);
+        }
+
+        Duration timeToLive = exchange.getIn().getHeader(TIME_TO_LIVE, 
Duration.class);
+        if (timeToLive != null) {
+            span.setTag(SERVICEBUS_TIME_TO_LIVE, timeToLive.toString());
+        }
+    }
+
+    @Override
+    protected String getMessageId(Exchange exchange) {
+        return exchange.getIn().getHeader(MESSAGE_ID, String.class);
+    }
+
+}
diff --git 
a/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
 
b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
new file mode 100644
index 00000000000..3006673d7ca
--- /dev/null
+++ 
b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java
@@ -0,0 +1,81 @@
+package org.apache.camel.tracing.decorators;
+
+import java.time.Duration;
+import java.time.OffsetDateTime;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.tracing.MockSpanAdapter;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ServiceBusSpanDecoratorTest {
+
+    @Test
+    public void testGetMessageId() {
+        String messageId = "abcd";
+        Exchange exchange = Mockito.mock(Exchange.class);
+        Message message = Mockito.mock(Message.class);
+
+        Mockito.when(exchange.getIn()).thenReturn(message);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.MESSAGE_ID, 
String.class)).thenReturn(messageId);
+
+        AbstractMessagingSpanDecorator decorator = new 
ServiceBusSpanDecorator();
+
+        assertEquals(messageId, decorator.getMessageId(exchange));
+    }
+
+    @Test
+    public void testPre() {
+        String contentType = "application/json";
+        String correlationId = "1234";
+        Long deliveryCount = 27L;
+        OffsetDateTime enqueuedSequenceNumber = OffsetDateTime.now();
+        OffsetDateTime enqueuedTime = OffsetDateTime.now();
+        OffsetDateTime expiresAt = OffsetDateTime.now();
+        String partitionKey = "MyPartitionKey";
+        String replyToSessionId = "MyReplyToSessionId";
+        String sessionId = "4321";
+        Duration ttl = Duration.ofDays(7);
+
+        Endpoint endpoint = Mockito.mock(Endpoint.class);
+        Exchange exchange = Mockito.mock(Exchange.class);
+        Message message = Mockito.mock(Message.class);
+
+        
Mockito.when(endpoint.getEndpointUri()).thenReturn("azure-servicebus:topicOrQueueName");
+        Mockito.when(exchange.getIn()).thenReturn(message);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.CONTENT_TYPE, 
String.class)).thenReturn(contentType);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.CORRELATION_ID, 
String.class)).thenReturn(correlationId);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.DELIVERY_COUNT, 
Long.class)).thenReturn(deliveryCount);
+        
Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_SEQUENCE_NUMBER,
 OffsetDateTime.class))
+                .thenReturn(enqueuedSequenceNumber);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_TIME, 
OffsetDateTime.class)).thenReturn(enqueuedTime);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.EXPIRES_AT, 
OffsetDateTime.class)).thenReturn(expiresAt);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.PARTITION_KEY, 
String.class)).thenReturn(partitionKey);
+        
Mockito.when(message.getHeader(ServiceBusSpanDecorator.REPLY_TO_SESSION_ID, 
String.class)).thenReturn(replyToSessionId);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.SESSION_ID, 
String.class)).thenReturn(sessionId);
+        Mockito.when(message.getHeader(ServiceBusSpanDecorator.TIME_TO_LIVE, 
Duration.class)).thenReturn(ttl);
+
+        AbstractMessagingSpanDecorator decorator = new 
ServiceBusSpanDecorator();
+
+        MockSpanAdapter span = new MockSpanAdapter();
+
+        decorator.pre(span, exchange, endpoint);
+
+        assertEquals(contentType, 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CONTENT_TYPE));
+        assertEquals(correlationId, 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CORRELATION_ID));
+        assertEquals(deliveryCount, 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_DELIVERY_COUNT));
+        assertEquals(enqueuedSequenceNumber.toString(),
+                
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER));
+        assertEquals(enqueuedTime.toString(), 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_TIME));
+        assertEquals(expiresAt.toString(), 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_EXPIRES_AT));
+        assertEquals(partitionKey, 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_PARTITION_KEY));
+        assertEquals(replyToSessionId, 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_REPLY_TO_SESSION_ID));
+        assertEquals(sessionId, 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_SESSION_ID));
+        assertEquals(ttl.toString(), 
span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_TIME_TO_LIVE));
+    }
+
+}

Reply via email to