This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new dd5fd77b029 CAMEL-21672: Use user provided ServiceBusProcessorClient 
in ServiceBusConsumer if provided
dd5fd77b029 is described below

commit dd5fd77b029aec2dd155ebc76527e373121f7f6a
Author: James Netherton <[email protected]>
AuthorDate: Wed Jan 29 07:30:07 2025 +0000

    CAMEL-21672: Use user provided ServiceBusProcessorClient in 
ServiceBusConsumer if provided
---
 .../azure/servicebus/ServiceBusConsumer.java          | 19 ++++++++++++-------
 .../azure/servicebus/ServiceBusConsumerTest.java      | 10 ++++++++++
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
 
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index 94ba24c83ee..01a4c543474 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -52,14 +52,19 @@ public class ServiceBusConsumer extends DefaultConsumer {
         super.doStart();
 
         LOG.debug("Creating connection to Azure ServiceBus");
-        // create client as per sessions
-        if (Boolean.FALSE.equals(getConfiguration().isSessionEnabled())) {
-            client = 
getEndpoint().getServiceBusClientFactory().createServiceBusProcessorClient(getConfiguration(),
-                    this::processMessage, this::processError);
-        } else {
-            client = 
getEndpoint().getServiceBusClientFactory().createServiceBusSessionProcessorClient(getConfiguration(),
-                    this::processMessage, this::processError);
+
+        client = getConfiguration().getProcessorClient();
+        if (client == null) {
+            // create client as per sessions
+            if (Boolean.FALSE.equals(getConfiguration().isSessionEnabled())) {
+                client = 
getEndpoint().getServiceBusClientFactory().createServiceBusProcessorClient(getConfiguration(),
+                        this::processMessage, this::processError);
+            } else {
+                client = 
getEndpoint().getServiceBusClientFactory().createServiceBusSessionProcessorClient(getConfiguration(),
+                        this::processMessage, this::processError);
+            }
         }
+
         client.start();
     }
 
diff --git 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
index 757cc21f107..914573ff0de 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
@@ -507,6 +507,16 @@ public class ServiceBusConsumerTest {
         }
     }
 
+    @Test
+    void customProcessorClient() throws Exception {
+        ServiceBusProcessorClient customClient = mock();
+        when(configuration.getProcessorClient()).thenReturn(customClient);
+        try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, 
processor)) {
+            consumer.doStart();
+            verify(customClient).start();
+        }
+    }
+
     private void configureMockMessage() {
         when(message.getApplicationProperties()).thenReturn(new HashMap<>());
         
when(message.getBody()).thenReturn(BinaryData.fromBytes(MESSAGE_BODY.getBytes()));

Reply via email to