This is an automated email from the ASF dual-hosted git repository. bvahdat pushed a commit to branch CAMEL-18055 in repository https://gitbox.apache.org/repos/asf/camel.git
commit a7233608ac7bbb4bf403bd03d7690640d01d62c2 Author: Babak Vahdat <[email protected]> AuthorDate: Thu May 5 21:11:17 2022 +0200 CAMEL-18055: Create tracing SpanDecorator for ServiceBusComponent --- .../org.apache.camel.tracing.SpanDecorator | 1 + .../decorators/ServiceBusSpanDecorator.java | 108 +++++++++++++++++++++ .../decorators/ServiceBusSpanDecoratorTest.java | 81 ++++++++++++++++ 3 files changed, 190 insertions(+) diff --git a/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator b/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator index 39b88a064e5..e575ef9978a 100644 --- a/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator +++ b/components/camel-opentelemetry/src/main/resources/META-INF/services/org.apache.camel.tracing.SpanDecorator @@ -42,6 +42,7 @@ org.apache.camel.tracing.decorators.PlatformHttpSpanDecorator org.apache.camel.tracing.decorators.RabbitmqSpanDecorator org.apache.camel.tracing.decorators.RestSpanDecorator org.apache.camel.tracing.decorators.SedaSpanDecorator +org.apache.camel.tracing.decorators.ServiceBusSpanDecorator org.apache.camel.tracing.decorators.ServletSpanDecorator org.apache.camel.tracing.decorators.SjmsSpanDecorator org.apache.camel.tracing.decorators.Sjms2SpanDecorator diff --git a/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java new file mode 100644 index 00000000000..5a38f4489a0 --- /dev/null +++ b/components/camel-tracing/src/main/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecorator.java @@ -0,0 +1,108 @@ +package org.apache.camel.tracing.decorators; + +import java.time.Duration; +import java.time.OffsetDateTime; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.tracing.SpanAdapter; + +public class ServiceBusSpanDecorator extends AbstractMessagingSpanDecorator { + + static final String SERVICEBUS_CONTENT_TYPE = "contentType"; + static final String SERVICEBUS_CORRELATION_ID = "correlationId"; + static final String SERVICEBUS_DELIVERY_COUNT = "deliveryCount"; + static final String SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER = "enqueuedSequenceNumber"; + static final String SERVICEBUS_ENQUEUED_TIME = "enqueuedTime"; + static final String SERVICEBUS_EXPIRES_AT = "expiresAt"; + static final String SERVICEBUS_PARTITION_KEY = "partitionKey"; + static final String SERVICEBUS_REPLY_TO_SESSION_ID = "replyToSessionId"; + static final String SERVICEBUS_SESSION_ID = "sessionId"; + static final String SERVICEBUS_TIME_TO_LIVE = "ttl"; + + /** + * Constants copied from {@link org.apache.camel.component.azure.servicebus.ServiceBusConstants} + */ + static final String CONTENT_TYPE = "CamelAzureServiceBusContentType"; + static final String CORRELATION_ID = "CamelAzureServiceBusCorrelationId"; + static final String DELIVERY_COUNT = "CamelAzureServiceBusDeliveryCount"; + static final String ENQUEUED_SEQUENCE_NUMBER = "CamelAzureServiceBusEnqueuedSequenceNumber"; + static final String ENQUEUED_TIME = "CamelAzureServiceBusEnqueuedTime"; + static final String EXPIRES_AT = "CamelAzureServiceBusExpiresAt"; + static final String MESSAGE_ID = "CamelAzureServiceBusMessageId"; + static final String SESSION_ID = "CamelAzureServiceBusSessionId"; + static final String REPLY_TO_SESSION_ID = "CamelAzureServiceBusReplyToSessionId"; + static final String PARTITION_KEY = "CamelAzureServiceBusPartitionKey"; + static final String TIME_TO_LIVE = "CamelAzureServiceBusTimeToLive"; + + @Override + public String getComponent() { + return "azure-servicebus"; + } + + @Override + public String getComponentClassName() { + return "org.apache.camel.component.azure.servicebus.ServiceBusComponent"; + } + + @Override + public void pre(SpanAdapter span, Exchange exchange, Endpoint endpoint) { + super.pre(span, exchange, endpoint); + + String contentType = exchange.getIn().getHeader(CONTENT_TYPE, String.class); + if (contentType != null) { + span.setTag(SERVICEBUS_CONTENT_TYPE, contentType); + } + + String correlationId = exchange.getIn().getHeader(CORRELATION_ID, String.class); + if (correlationId != null) { + span.setTag(SERVICEBUS_CORRELATION_ID, correlationId); + } + + Long deliveryCount = exchange.getIn().getHeader(DELIVERY_COUNT, Long.class); + if (deliveryCount != null) { + span.setTag(SERVICEBUS_DELIVERY_COUNT, deliveryCount); + } + + OffsetDateTime enqueuedSequenceNumber = exchange.getIn().getHeader(ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class); + if (enqueuedSequenceNumber != null) { + span.setTag(SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER, enqueuedSequenceNumber.toString()); + } + + OffsetDateTime enqueuedTime = exchange.getIn().getHeader(ENQUEUED_TIME, OffsetDateTime.class); + if (enqueuedTime != null) { + span.setTag(SERVICEBUS_ENQUEUED_TIME, enqueuedTime.toString()); + } + + OffsetDateTime expiresAt = exchange.getIn().getHeader(EXPIRES_AT, OffsetDateTime.class); + if (expiresAt != null) { + span.setTag(SERVICEBUS_EXPIRES_AT, expiresAt.toString()); + } + + String partitionKey = exchange.getIn().getHeader(PARTITION_KEY, String.class); + if (partitionKey != null) { + span.setTag(SERVICEBUS_PARTITION_KEY, partitionKey); + } + + String replyToSessionId = exchange.getIn().getHeader(REPLY_TO_SESSION_ID, String.class); + if (replyToSessionId != null) { + span.setTag(SERVICEBUS_REPLY_TO_SESSION_ID, replyToSessionId); + } + + String sessionId = exchange.getIn().getHeader(SESSION_ID, String.class); + if (sessionId != null) { + span.setTag(SERVICEBUS_SESSION_ID, sessionId); + } + + Duration timeToLive = exchange.getIn().getHeader(TIME_TO_LIVE, Duration.class); + if (timeToLive != null) { + span.setTag(SERVICEBUS_TIME_TO_LIVE, timeToLive.toString()); + } + } + + @Override + protected String getMessageId(Exchange exchange) { + return exchange.getIn().getHeader(MESSAGE_ID, String.class); + } + +} diff --git a/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java new file mode 100644 index 00000000000..3006673d7ca --- /dev/null +++ b/components/camel-tracing/src/test/java/org/apache/camel/tracing/decorators/ServiceBusSpanDecoratorTest.java @@ -0,0 +1,81 @@ +package org.apache.camel.tracing.decorators; + +import java.time.Duration; +import java.time.OffsetDateTime; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.tracing.MockSpanAdapter; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ServiceBusSpanDecoratorTest { + + @Test + public void testGetMessageId() { + String messageId = "abcd"; + Exchange exchange = Mockito.mock(Exchange.class); + Message message = Mockito.mock(Message.class); + + Mockito.when(exchange.getIn()).thenReturn(message); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.MESSAGE_ID, String.class)).thenReturn(messageId); + + AbstractMessagingSpanDecorator decorator = new ServiceBusSpanDecorator(); + + assertEquals(messageId, decorator.getMessageId(exchange)); + } + + @Test + public void testPre() { + String contentType = "application/json"; + String correlationId = "1234"; + Long deliveryCount = 27L; + OffsetDateTime enqueuedSequenceNumber = OffsetDateTime.now(); + OffsetDateTime enqueuedTime = OffsetDateTime.now(); + OffsetDateTime expiresAt = OffsetDateTime.now(); + String partitionKey = "MyPartitionKey"; + String replyToSessionId = "MyReplyToSessionId"; + String sessionId = "4321"; + Duration ttl = Duration.ofDays(7); + + Endpoint endpoint = Mockito.mock(Endpoint.class); + Exchange exchange = Mockito.mock(Exchange.class); + Message message = Mockito.mock(Message.class); + + Mockito.when(endpoint.getEndpointUri()).thenReturn("azure-servicebus:topicOrQueueName"); + Mockito.when(exchange.getIn()).thenReturn(message); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.CONTENT_TYPE, String.class)).thenReturn(contentType); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.CORRELATION_ID, String.class)).thenReturn(correlationId); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.DELIVERY_COUNT, Long.class)).thenReturn(deliveryCount); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_SEQUENCE_NUMBER, OffsetDateTime.class)) + .thenReturn(enqueuedSequenceNumber); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.ENQUEUED_TIME, OffsetDateTime.class)).thenReturn(enqueuedTime); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.EXPIRES_AT, OffsetDateTime.class)).thenReturn(expiresAt); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.PARTITION_KEY, String.class)).thenReturn(partitionKey); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.REPLY_TO_SESSION_ID, String.class)).thenReturn(replyToSessionId); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.SESSION_ID, String.class)).thenReturn(sessionId); + Mockito.when(message.getHeader(ServiceBusSpanDecorator.TIME_TO_LIVE, Duration.class)).thenReturn(ttl); + + AbstractMessagingSpanDecorator decorator = new ServiceBusSpanDecorator(); + + MockSpanAdapter span = new MockSpanAdapter(); + + decorator.pre(span, exchange, endpoint); + + assertEquals(contentType, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CONTENT_TYPE)); + assertEquals(correlationId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_CORRELATION_ID)); + assertEquals(deliveryCount, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_DELIVERY_COUNT)); + assertEquals(enqueuedSequenceNumber.toString(), + span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_SEQUENCE_NUMBER)); + assertEquals(enqueuedTime.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_ENQUEUED_TIME)); + assertEquals(expiresAt.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_EXPIRES_AT)); + assertEquals(partitionKey, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_PARTITION_KEY)); + assertEquals(replyToSessionId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_REPLY_TO_SESSION_ID)); + assertEquals(sessionId, span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_SESSION_ID)); + assertEquals(ttl.toString(), span.tags().get(ServiceBusSpanDecorator.SERVICEBUS_TIME_TO_LIVE)); + } + +}
