This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new dd5fd77b029 CAMEL-21672: Use user provided ServiceBusProcessorClient
in ServiceBusConsumer if provided
dd5fd77b029 is described below
commit dd5fd77b029aec2dd155ebc76527e373121f7f6a
Author: James Netherton <[email protected]>
AuthorDate: Wed Jan 29 07:30:07 2025 +0000
CAMEL-21672: Use user provided ServiceBusProcessorClient in
ServiceBusConsumer if provided
---
.../azure/servicebus/ServiceBusConsumer.java | 19 ++++++++++++-------
.../azure/servicebus/ServiceBusConsumerTest.java | 10 ++++++++++
2 files changed, 22 insertions(+), 7 deletions(-)
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index 94ba24c83ee..01a4c543474 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -52,14 +52,19 @@ public class ServiceBusConsumer extends DefaultConsumer {
super.doStart();
LOG.debug("Creating connection to Azure ServiceBus");
- // create client as per sessions
- if (Boolean.FALSE.equals(getConfiguration().isSessionEnabled())) {
- client =
getEndpoint().getServiceBusClientFactory().createServiceBusProcessorClient(getConfiguration(),
- this::processMessage, this::processError);
- } else {
- client =
getEndpoint().getServiceBusClientFactory().createServiceBusSessionProcessorClient(getConfiguration(),
- this::processMessage, this::processError);
+
+ client = getConfiguration().getProcessorClient();
+ if (client == null) {
+ // create client as per sessions
+ if (Boolean.FALSE.equals(getConfiguration().isSessionEnabled())) {
+ client =
getEndpoint().getServiceBusClientFactory().createServiceBusProcessorClient(getConfiguration(),
+ this::processMessage, this::processError);
+ } else {
+ client =
getEndpoint().getServiceBusClientFactory().createServiceBusSessionProcessorClient(getConfiguration(),
+ this::processMessage, this::processError);
+ }
}
+
client.start();
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
index 757cc21f107..914573ff0de 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
@@ -507,6 +507,16 @@ public class ServiceBusConsumerTest {
}
}
+ @Test
+ void customProcessorClient() throws Exception {
+ ServiceBusProcessorClient customClient = mock();
+ when(configuration.getProcessorClient()).thenReturn(customClient);
+ try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint,
processor)) {
+ consumer.doStart();
+ verify(customClient).start();
+ }
+ }
+
private void configureMockMessage() {
when(message.getApplicationProperties()).thenReturn(new HashMap<>());
when(message.getBody()).thenReturn(BinaryData.fromBytes(MESSAGE_BODY.getBytes()));