This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 45d02cd627aa feat(azure-servicebus): Ensure Content-Type header is
propagated into AMQP Message. (#19664)
45d02cd627aa is described below
commit 45d02cd627aae8a693a0d6436294509a44a1bff9
Author: Fernando Matias Balieiro <[email protected]>
AuthorDate: Thu Oct 23 09:48:34 2025 +0100
feat(azure-servicebus): Ensure Content-Type header is propagated into AMQP
Message. (#19664)
---
.../azure/servicebus/ServiceBusUtils.java | 7 +++
.../azure/servicebus/ServiceBusUtilsTest.java | 54 ++++++++++++++++++++++
.../integration/ServiceBusProducerIT.java | 12 +++++
3 files changed, 73 insertions(+)
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
index db4a1a1ef7a2..d1ec5b4f3c5b 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
@@ -24,6 +24,8 @@ import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusMessage;
import org.apache.camel.util.ObjectHelper;
+import static org.apache.camel.Exchange.CONTENT_TYPE;
+
public final class ServiceBusUtils {
private ServiceBusUtils() {
@@ -44,6 +46,11 @@ public final class ServiceBusUtils {
}
if (applicationProperties != null) {
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);
+
+ final Object contentType =
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().get(CONTENT_TYPE);
+ if (contentType != null) {
+
serviceBusMessage.getRawAmqpMessage().getProperties().setContentType(contentType.toString());
+ }
}
if (ObjectHelper.isNotEmpty(correlationId)) {
serviceBusMessage.setCorrelationId(correlationId);
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
index 5c80d0033631..5eed3d8d5ec2 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
@@ -20,18 +20,23 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.stream.StreamSupport;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
+import org.apache.camel.Exchange;
import org.junit.jupiter.api.Test;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
public class ServiceBusUtilsTest {
+ private static final String APPLICATION_JSON_CONTENT_TYPE =
"application/json";
+
@Test
void testCreateServiceBusMessage() {
// test string
@@ -137,6 +142,55 @@ public class ServiceBusUtilsTest {
.anyMatch(record ->
record.getSessionId().equals("session-2")));
}
+ @Test
+ void testCreateServiceBusMessageWithContentType() {
+ final Map<String, Object> applicationProperties = Map.of(
+ Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
+
+ final ServiceBusMessage message
+ = ServiceBusUtils.createServiceBusMessage("test string",
applicationProperties, null, null);
+
+ assertEquals("test string", message.getBody().toString());
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
+ }
+
+ @Test
+ void testCreateServiceBusMessagesWithContentType() {
+ final Map<String, Object> applicationProperties = Map.of(
+ Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
+
+ final List<String> inputMessages = new LinkedList<>();
+ inputMessages.add("test data");
+ inputMessages.add(String.valueOf(12345));
+
+ final Iterable<ServiceBusMessage> busMessages
+ = ServiceBusUtils.createServiceBusMessages(inputMessages,
applicationProperties, null, null);
+
+ assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
+ .anyMatch(record -> record.getBody().toString().equals("test
data")));
+ assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
+ .anyMatch(record ->
record.getBody().toString().equals("12345")));
+ assertThat(StreamSupport.stream(busMessages.spliterator(), false))
+ .allMatch(message ->
APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
+
+ //Test bytes
+ final List<byte[]> inputMessages2 = new LinkedList<>();
+ byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+ byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+ inputMessages2.add(byteBody1);
+ inputMessages2.add(byteBody2);
+
+ final Iterable<ServiceBusMessage> busMessages2
+ = ServiceBusUtils.createServiceBusMessages(inputMessages2,
applicationProperties, null, null);
+
+ assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
+ .anyMatch(message ->
Arrays.equals(message.getBody().toBytes(), byteBody1)));
+ assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
+ .anyMatch(message ->
Arrays.equals(message.getBody().toBytes(), byteBody2)));
+ assertThat(StreamSupport.stream(busMessages.spliterator(), false))
+ .allMatch(message ->
APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
+ }
+
@Test
void validateConfigurationMissingCredentials() {
assertThrows(IllegalArgumentException.class,
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
index 97619fda8302..7e450c9a636e 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.azure.servicebus.ServiceBusConstants;
@@ -47,6 +48,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
private static final String DIRECT_SEND_TO_SESSION_QUEUE_URI =
"direct:sendToQueueSessions";
private static final Map<String, Object> PROPAGATED_HEADERS = new
HashMap<>();
private static final Pattern MESSAGE_BODY_PATTERN =
Pattern.compile("^message-[0-4]$");
+ private static final String APPLICATION_JSON_CONTENT_TYPE =
"application/json";
static {
PROPAGATED_HEADERS.put("booleanHeader", true);
@@ -60,6 +62,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
PROPAGATED_HEADERS.put("stringHeader", "stringHeader");
PROPAGATED_HEADERS.put("timestampHeader", new Date());
PROPAGATED_HEADERS.put("uuidHeader", UUID.randomUUID());
+ PROPAGATED_HEADERS.put(Exchange.CONTENT_TYPE,
APPLICATION_JSON_CONTENT_TYPE);
}
private ProducerTemplate producerTemplate;
@@ -120,6 +123,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
});
}
}
@@ -148,6 +152,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
});
}
}
@@ -170,6 +175,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
});
}
}
@@ -194,6 +200,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
assertInstanceOf(OffsetDateTime.class,
message.getScheduledEnqueueTime());
});
}
@@ -225,6 +232,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
assertInstanceOf(OffsetDateTime.class,
message.getScheduledEnqueueTime());
});
}
@@ -252,6 +260,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
});
}
}
@@ -280,6 +289,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
});
}
}
@@ -324,6 +334,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
assertInstanceOf(OffsetDateTime.class,
message.getScheduledEnqueueTime());
});
}
@@ -355,6 +366,7 @@ public class ServiceBusProducerIT extends
BaseServiceBusTestSupport {
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
Map<String, Object> applicationProperties =
message.getApplicationProperties();
assertEquals(PROPAGATED_HEADERS, applicationProperties);
+ assertEquals(APPLICATION_JSON_CONTENT_TYPE,
message.getContentType());
assertInstanceOf(OffsetDateTime.class,
message.getScheduledEnqueueTime());
});
}