This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 45d02cd627aa feat(azure-servicebus): Ensure Content-Type header is 
propagated into AMQP Message. (#19664)
45d02cd627aa is described below

commit 45d02cd627aae8a693a0d6436294509a44a1bff9
Author: Fernando Matias Balieiro <[email protected]>
AuthorDate: Thu Oct 23 09:48:34 2025 +0100

    feat(azure-servicebus): Ensure Content-Type header is propagated into AMQP 
Message. (#19664)
---
 .../azure/servicebus/ServiceBusUtils.java          |  7 +++
 .../azure/servicebus/ServiceBusUtilsTest.java      | 54 ++++++++++++++++++++++
 .../integration/ServiceBusProducerIT.java          | 12 +++++
 3 files changed, 73 insertions(+)

diff --git 
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
 
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
index db4a1a1ef7a2..d1ec5b4f3c5b 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
@@ -24,6 +24,8 @@ import com.azure.core.util.BinaryData;
 import com.azure.messaging.servicebus.ServiceBusMessage;
 import org.apache.camel.util.ObjectHelper;
 
+import static org.apache.camel.Exchange.CONTENT_TYPE;
+
 public final class ServiceBusUtils {
 
     private ServiceBusUtils() {
@@ -44,6 +46,11 @@ public final class ServiceBusUtils {
         }
         if (applicationProperties != null) {
             
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().putAll(applicationProperties);
+
+            final Object contentType = 
serviceBusMessage.getRawAmqpMessage().getApplicationProperties().get(CONTENT_TYPE);
+            if (contentType != null) {
+                
serviceBusMessage.getRawAmqpMessage().getProperties().setContentType(contentType.toString());
+            }
         }
         if (ObjectHelper.isNotEmpty(correlationId)) {
             serviceBusMessage.setCorrelationId(correlationId);
diff --git 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
index 5c80d0033631..5eed3d8d5ec2 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
@@ -20,18 +20,23 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.StreamSupport;
 
 import com.azure.messaging.servicebus.ServiceBusClientBuilder;
 import com.azure.messaging.servicebus.ServiceBusMessage;
 import com.azure.messaging.servicebus.ServiceBusProcessorClient;
 import com.azure.messaging.servicebus.ServiceBusSenderClient;
+import org.apache.camel.Exchange;
 import org.junit.jupiter.api.Test;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.*;
 
 public class ServiceBusUtilsTest {
 
+    private static final String APPLICATION_JSON_CONTENT_TYPE = 
"application/json";
+
     @Test
     void testCreateServiceBusMessage() {
         // test string
@@ -137,6 +142,55 @@ public class ServiceBusUtilsTest {
                 .anyMatch(record -> 
record.getSessionId().equals("session-2")));
     }
 
+    @Test
+    void testCreateServiceBusMessageWithContentType() {
+        final Map<String, Object> applicationProperties = Map.of(
+                Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
+
+        final ServiceBusMessage message
+                = ServiceBusUtils.createServiceBusMessage("test string", 
applicationProperties, null, null);
+
+        assertEquals("test string", message.getBody().toString());
+        assertEquals(APPLICATION_JSON_CONTENT_TYPE, message.getContentType());
+    }
+
+    @Test
+    void testCreateServiceBusMessagesWithContentType() {
+        final Map<String, Object> applicationProperties = Map.of(
+                Exchange.CONTENT_TYPE, APPLICATION_JSON_CONTENT_TYPE);
+
+        final List<String> inputMessages = new LinkedList<>();
+        inputMessages.add("test data");
+        inputMessages.add(String.valueOf(12345));
+
+        final Iterable<ServiceBusMessage> busMessages
+                = ServiceBusUtils.createServiceBusMessages(inputMessages, 
applicationProperties, null, null);
+
+        assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
+                .anyMatch(record -> record.getBody().toString().equals("test 
data")));
+        assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
+                .anyMatch(record -> 
record.getBody().toString().equals("12345")));
+        assertThat(StreamSupport.stream(busMessages.spliterator(), false))
+                .allMatch(message -> 
APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
+
+        //Test bytes
+        final List<byte[]> inputMessages2 = new LinkedList<>();
+        byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
+        byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
+        inputMessages2.add(byteBody1);
+        inputMessages2.add(byteBody2);
+
+        final Iterable<ServiceBusMessage> busMessages2
+                = ServiceBusUtils.createServiceBusMessages(inputMessages2, 
applicationProperties, null, null);
+
+        assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
+                .anyMatch(message -> 
Arrays.equals(message.getBody().toBytes(), byteBody1)));
+        assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
+                .anyMatch(message -> 
Arrays.equals(message.getBody().toBytes(), byteBody2)));
+        assertThat(StreamSupport.stream(busMessages.spliterator(), false))
+                .allMatch(message -> 
APPLICATION_JSON_CONTENT_TYPE.equals(message.getContentType()));
+    }
+
     @Test
     void validateConfigurationMissingCredentials() {
         assertThrows(IllegalArgumentException.class,
diff --git 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
index 97619fda8302..7e450c9a636e 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/ServiceBusProducerIT.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
 import com.azure.messaging.servicebus.ServiceBusProcessorClient;
 import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.azure.servicebus.ServiceBusConstants;
@@ -47,6 +48,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
     private static final String DIRECT_SEND_TO_SESSION_QUEUE_URI = 
"direct:sendToQueueSessions";
     private static final Map<String, Object> PROPAGATED_HEADERS = new 
HashMap<>();
     private static final Pattern MESSAGE_BODY_PATTERN = 
Pattern.compile("^message-[0-4]$");
+    private static final String APPLICATION_JSON_CONTENT_TYPE = 
"application/json";
 
     static {
         PROPAGATED_HEADERS.put("booleanHeader", true);
@@ -60,6 +62,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
         PROPAGATED_HEADERS.put("stringHeader", "stringHeader");
         PROPAGATED_HEADERS.put("timestampHeader", new Date());
         PROPAGATED_HEADERS.put("uuidHeader", UUID.randomUUID());
+        PROPAGATED_HEADERS.put(Exchange.CONTENT_TYPE, 
APPLICATION_JSON_CONTENT_TYPE);
     }
 
     private ProducerTemplate producerTemplate;
@@ -120,6 +123,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
             });
         }
     }
@@ -148,6 +152,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
             });
         }
     }
@@ -170,6 +175,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
             });
         }
     }
@@ -194,6 +200,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
                 assertInstanceOf(OffsetDateTime.class, 
message.getScheduledEnqueueTime());
             });
         }
@@ -225,6 +232,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
                 assertInstanceOf(OffsetDateTime.class, 
message.getScheduledEnqueueTime());
             });
         }
@@ -252,6 +260,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
             });
         }
     }
@@ -280,6 +289,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
             });
         }
     }
@@ -324,6 +334,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
                 assertInstanceOf(OffsetDateTime.class, 
message.getScheduledEnqueueTime());
             });
         }
@@ -355,6 +366,7 @@ public class ServiceBusProducerIT extends 
BaseServiceBusTestSupport {
                 
assertTrue(MESSAGE_BODY_PATTERN.matcher(messageBody).matches());
                 Map<String, Object> applicationProperties = 
message.getApplicationProperties();
                 assertEquals(PROPAGATED_HEADERS, applicationProperties);
+                assertEquals(APPLICATION_JSON_CONTENT_TYPE, 
message.getContentType());
                 assertInstanceOf(OffsetDateTime.class, 
message.getScheduledEnqueueTime());
             });
         }

Reply via email to