This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c329be1c53b CAMEL-21679: Fix azure-servicebus configuration validation
so that individual client options can be used
c329be1c53b is described below
commit c329be1c53b6d93f4c5eee73026f8fe1a917fab9
Author: James Netherton <[email protected]>
AuthorDate: Wed Jan 29 12:01:06 2025 +0000
CAMEL-21679: Fix azure-servicebus configuration validation so that
individual client options can be used
---
.../azure/servicebus/ServiceBusComponent.java | 11 -----
.../azure/servicebus/ServiceBusConsumer.java | 6 +++
.../azure/servicebus/ServiceBusProducer.java | 1 +
.../azure/servicebus/ServiceBusUtils.java | 13 ++++++
.../azure/servicebus/ServiceBusEndpointTest.java | 29 +++++++++---
.../azure/servicebus/ServiceBusUtilsTest.java | 51 ++++++++++++++++++++++
6 files changed, 94 insertions(+), 17 deletions(-)
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
index 8957b1abda7..0f1af11f787 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
@@ -58,7 +58,6 @@ public class ServiceBusComponent extends DefaultComponent {
final ServiceBusEndpoint endpoint = new ServiceBusEndpoint(uri, this,
configuration);
setProperties(endpoint, parameters);
setCredentials(configuration);
- validateConfigurations(configuration);
return endpoint;
}
@@ -86,14 +85,4 @@ public class ServiceBusComponent extends DefaultComponent {
public void setConfiguration(ServiceBusConfiguration configuration) {
this.configuration = configuration;
}
-
- private void validateConfigurations(final ServiceBusConfiguration
configuration) {
- if (configuration.getProcessorClient() == null ||
configuration.getSenderClient() == null) {
- if (ObjectHelper.isEmpty(configuration.getConnectionString()) &&
-
ObjectHelper.isEmpty(configuration.getFullyQualifiedNamespace())) {
- throw new IllegalArgumentException(
- "Azure ServiceBus ConnectionString or FQNS must be
specified.");
- }
- }
- }
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index 01a4c543474..3c32ad9f2dd 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -47,6 +47,12 @@ public class ServiceBusConsumer extends DefaultConsumer {
super(endpoint, processor);
}
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+ ServiceBusUtils.validateConfiguration(getConfiguration(), true);
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
index 9dc3b58f1d2..5169ef67324 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
@@ -58,6 +58,7 @@ public class ServiceBusProducer extends DefaultProducer {
@Override
protected void doInit() throws Exception {
super.doInit();
+ ServiceBusUtils.validateConfiguration(getConfiguration(), false);
configurationOptionsProxy = new
ServiceBusConfigurationOptionsProxy(getConfiguration());
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
index 7d553b94b54..db4a1a1ef7a 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusUtils.java
@@ -61,4 +61,17 @@ public final class ServiceBusUtils {
.map(obj -> createServiceBusMessage(obj,
applicationProperties, correlationId, sessionId))
.collect(Collectors.toList());
}
+
+ public static void validateConfiguration(final ServiceBusConfiguration
configuration, final boolean isConsumer) {
+ final boolean customClientAbsent
+ = isConsumer ? configuration.getProcessorClient() == null :
configuration.getSenderClient() == null;
+ if (customClientAbsent &&
isConnectionStringOrFQNSAbsent(configuration)) {
+ throw new IllegalArgumentException("Azure ServiceBus
ConnectionString or FQNS must be specified.");
+ }
+ }
+
+ static boolean isConnectionStringOrFQNSAbsent(final
ServiceBusConfiguration configuration) {
+ return ObjectHelper.isEmpty(configuration.getConnectionString())
+ &&
ObjectHelper.isEmpty(configuration.getFullyQualifiedNamespace());
+ }
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
index a2b291f849f..fdc10d7b7b2 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointTest.java
@@ -19,11 +19,15 @@ package org.apache.camel.component.azure.servicebus;
import java.util.HashMap;
import java.util.Map;
+import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
+import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.DefaultAzureCredentialBuilder;
+import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.ResolveEndpointFailedException;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
import static org.junit.jupiter.api.Assertions.*;
@@ -31,15 +35,28 @@ class ServiceBusEndpointTest extends CamelTestSupport {
@Test
void testCreateWithInvalidData() {
- assertThrows(ResolveEndpointFailedException.class,
- () -> context.getEndpoint("azure-servicebus:test//?"));
+ Exception exception =
assertThrows(FailedToCreateProducerException.class, () -> {
+ template.sendBody("azure-servicebus:test//?", null);
+ });
+ assertInstanceOf(IllegalArgumentException.class, exception.getCause());
- assertThrows(ResolveEndpointFailedException.class,
- () ->
context.getEndpoint("azure-servicebus://?connectionString=test"));
+ exception = assertThrows(ResolveEndpointFailedException.class, () -> {
+ template.sendBody("azure-servicebus://?connectionString=test",
null);
+ });
+ assertInstanceOf(IllegalArgumentException.class, exception.getCause());
// provided credential but no fully qualified namespace
- assertThrows(ResolveEndpointFailedException.class,
- () ->
context.getEndpoint("azure-servicebus:test?tokenCredential=credential"));
+ context.getRegistry().bind("credential", new TokenCredential() {
+ @Override
+ public Mono<AccessToken> getToken(TokenRequestContext
tokenRequestContext) {
+ return Mono.empty();
+ }
+ });
+
+ exception = assertThrows(FailedToCreateProducerException.class, () -> {
+
template.sendBody("azure-servicebus:test?tokenCredential=#credential", null);
+ });
+ assertInstanceOf(IllegalArgumentException.class, exception.getCause());
}
@Test
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
index 0a97080d37e..5c80d003363 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusUtilsTest.java
@@ -22,7 +22,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.stream.StreamSupport;
+import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusMessage;
+import com.azure.messaging.servicebus.ServiceBusProcessorClient;
+import com.azure.messaging.servicebus.ServiceBusSenderClient;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
@@ -133,4 +136,52 @@ public class ServiceBusUtilsTest {
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(record ->
record.getSessionId().equals("session-2")));
}
+
+ @Test
+ void validateConfigurationMissingCredentials() {
+ assertThrows(IllegalArgumentException.class,
+ () -> ServiceBusUtils.validateConfiguration(new
ServiceBusConfiguration(), false));
+ }
+
+ @Test
+ void validateConfigurationConnectionStringProvided() {
+ ServiceBusConfiguration configuration = new ServiceBusConfiguration();
+ configuration.setConnectionString("test");
+ assertDoesNotThrow(() ->
ServiceBusUtils.validateConfiguration(configuration, false));
+ }
+
+ @Test
+ void validateConfigurationFQNSProvided() {
+ ServiceBusConfiguration configuration = new ServiceBusConfiguration();
+ configuration.setFullyQualifiedNamespace("test");
+ assertDoesNotThrow(() ->
ServiceBusUtils.validateConfiguration(configuration, false));
+ }
+
+ @Test
+ void validateConfigurationCustomProcessorClient() {
+ ServiceBusConfiguration configuration = new ServiceBusConfiguration();
+ ServiceBusProcessorClient client = new ServiceBusClientBuilder()
+
.connectionString("Endpoint=sb://camel.apache.org/;SharedAccessKeyName=test;SharedAccessKey=test")
+ .processor()
+ .queueName("test")
+ .processMessage(serviceBusReceivedMessageContext -> {
+ })
+ .processError(serviceBusErrorContext -> {
+ })
+ .buildProcessorClient();
+ configuration.setProcessorClient(client);
+ assertDoesNotThrow(() ->
ServiceBusUtils.validateConfiguration(configuration, true));
+ }
+
+ @Test
+ void validateConfigurationCustomSenderClient() {
+ ServiceBusConfiguration configuration = new ServiceBusConfiguration();
+ ServiceBusSenderClient client = new ServiceBusClientBuilder()
+
.connectionString("Endpoint=sb://camel.apache.org/;SharedAccessKeyName=test;SharedAccessKey=test")
+ .sender()
+ .queueName("test")
+ .buildClient();
+ configuration.setSenderClient(client);
+ assertDoesNotThrow(() ->
ServiceBusUtils.validateConfiguration(configuration, false));
+ }
}