This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5d8dae6ce15 CAMEL-20660: Use synchronous Azure ServiceBusSenderClient
(#14314)
5d8dae6ce15 is described below
commit 5d8dae6ce156dc35fae52f001a5dd3b4910da5f2
Author: Dylan Piergies <[email protected]>
AuthorDate: Wed Jun 5 09:46:20 2024 +0100
CAMEL-20660: Use synchronous Azure ServiceBusSenderClient (#14314)
* CAMEL-20660: Use synchronous Azure ServiceBusSenderClient
The implementation of CAMEL-20266 introduced an issue whereby the async
client would attempt to execute the `AsyncCallback#done` method. When the
consumer is a `ServiceBusConsumer`, this would cause the callback
invocation to fail because it results in `Mono#block` being invoked on a
Reactor thread.
Switching the `ServiceBusProducer` to synchronous operation seems to be
the most straightforward way to address this issue. It is unknown whether
this issue can also impact other consumers.
* Revert camel-spring.xsd
---
.../camel/catalog/components/azure-servicebus.json | 8 +-
.../servicebus/ServiceBusComponentConfigurer.java | 14 +-
.../servicebus/ServiceBusEndpointConfigurer.java | 14 +-
.../servicebus/ServiceBusEndpointUriFactory.java | 2 +-
.../azure/servicebus/azure-servicebus.json | 8 +-
.../src/main/docs/azure-servicebus-component.adoc | 17 +--
.../azure/servicebus/ServiceBusComponent.java | 2 +-
.../azure/servicebus/ServiceBusConfiguration.java | 16 +--
.../azure/servicebus/ServiceBusProducer.java | 158 +++++++--------------
.../servicebus/client/ServiceBusClientFactory.java | 4 +-
.../client/ServiceBusSenderAsyncClientWrapper.java | 133 -----------------
.../operations/ServiceBusSenderOperations.java | 50 +++----
.../operations/ServiceBusSenderOperationsTest.java | 44 +++---
.../AzureServicebusComponentBuilderFactory.java | 16 +--
.../dsl/ServiceBusEndpointBuilderFactory.java | 46 +++---
.../kotlin/components/AzureServicebusUriDsl.kt | 10 +-
16 files changed, 171 insertions(+), 371 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
index 9192954a44b..b44b54aeefb 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/azure-servicebus.json
@@ -28,7 +28,7 @@
"clientOptions": { "index": 2, "kind": "property", "displayName": "Client
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.util.ClientOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the ClientOptions
to be sent from the client built from this builder, [...]
"configuration": { "index": 3, "kind": "property", "displayName":
"Configuration", "group": "common", "label": "", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"deprecated": false, "autowired": false, "secret": false, "description": "The
component configurations" },
"headerFilterStrategy": { "index": 4, "kind": "property", "displayName":
"Header Filter Strategy", "group": "common", "label": "common", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "To use a custom
HeaderFilterStrategy to filter Serv [...]
- "proxyOptions": { "index": 5, "kind": "property", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderAsyncClient. When a p [...]
+ "proxyOptions": { "index": 5, "kind": "property", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderClient. When a proxy [...]
"serviceBusType": { "index": 6, "kind": "property", "displayName":
"Service Bus Type", "group": "common", "label": "common", "required": true,
"type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusType", "enum": [
"queue", "topic" ], "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": "queue", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "c [...]
"bridgeErrorHandler": { "index": 7, "kind": "property", "displayName":
"Bridge Error Handler", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Allows for bridging the consumer to the Camel routing Error Handler, which
mean any exceptions (if possible) occurred while the Camel consumer is trying
to pickup incoming messages, or the like [...]
"enableDeadLettering": { "index": 8, "kind": "property", "displayName":
"Enable Dead Lettering", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Enable application level
deadlettering to the subscription [...]
@@ -43,7 +43,7 @@
"lazyStartProducer": { "index": 17, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
"producerOperation": { "index": 18, "kind": "property", "displayName":
"Producer Operation", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition",
"enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "sendMessages",
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConf [...]
"scheduledEnqueueTime": { "index": 19, "kind": "property", "displayName":
"Scheduled Enqueue Time", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets OffsetDateTime at
which the message should appear in the S [...]
- "senderAsyncClient": { "index": 20, "kind": "property", "displayName":
"Sender Async Client", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated":
false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets SenderAs [...]
+ "senderClient": { "index": 20, "kind": "property", "displayName": "Sender
Client", "group": "producer", "label": "producer", "required": false, "type":
"object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderClient",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets senderClient to be
used [...]
"serviceBusTransactionContext": { "index": 21, "kind": "property",
"displayName": "Service Bus Transaction Context", "group": "producer", "label":
"producer", "required": false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Represents [...]
"autowiredEnabled": { "index": 22, "kind": "property", "displayName":
"Autowired Enabled", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether autowiring is enabled. This is used for automatic autowiring options
(the option must be marked as autowired) by looking up in the registry to find
if there is a single instance of matching [...]
"connectionString": { "index": 23, "kind": "property", "displayName":
"Connection String", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": true, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the connection
string for a Service Bus namespace or a specific Service Bus [...]
@@ -84,7 +84,7 @@
"amqpTransportType": { "index": 2, "kind": "parameter", "displayName":
"Amqp Transport Type", "group": "common", "label": "common", "required": false,
"type": "object", "javaType": "com.azure.core.amqp.AmqpTransportType", "enum":
[ "Amqp", "AmqpWebSockets" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "AMQP", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": [...]
"clientOptions": { "index": 3, "kind": "parameter", "displayName": "Client
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.util.ClientOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the ClientOptions
to be sent from the client built from this builder [...]
"headerFilterStrategy": { "index": 4, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "common", "label": "common", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "To use a custom
HeaderFilterStrategy to filter Ser [...]
- "proxyOptions": { "index": 5, "kind": "parameter", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderAsyncClient. When a [...]
+ "proxyOptions": { "index": 5, "kind": "parameter", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderClient. When a proxy [...]
"serviceBusType": { "index": 6, "kind": "parameter", "displayName":
"Service Bus Type", "group": "common", "label": "common", "required": true,
"type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusType", "enum": [
"queue", "topic" ], "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": "queue", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": " [...]
"enableDeadLettering": { "index": 7, "kind": "parameter", "displayName":
"Enable Dead Lettering", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Enable application level
deadlettering to the subscription [...]
"maxAutoLockRenewDuration": { "index": 8, "kind": "parameter",
"displayName": "Max Auto Lock Renew Duration", "group": "consumer", "label":
"consumer", "required": false, "type": "object", "javaType":
"java.time.Duration", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "5m", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the amount of time
to continue a [...]
@@ -100,7 +100,7 @@
"binary": { "index": 18, "kind": "parameter", "displayName": "Binary",
"group": "producer", "label": "producer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Set binary mode. If
true, message body will be sent as byte. By default, it is false." },
"producerOperation": { "index": 19, "kind": "parameter", "displayName":
"Producer Operation", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition",
"enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "sendMessages",
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusCon [...]
"scheduledEnqueueTime": { "index": 20, "kind": "parameter", "displayName":
"Scheduled Enqueue Time", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets OffsetDateTime at
which the message should appear in the [...]
- "senderAsyncClient": { "index": 21, "kind": "parameter", "displayName":
"Sender Async Client", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated":
false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets SenderA [...]
+ "senderClient": { "index": 21, "kind": "parameter", "displayName": "Sender
Client", "group": "producer", "label": "producer", "required": false, "type":
"object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderClient",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets senderClient to be
used [...]
"serviceBusTransactionContext": { "index": 22, "kind": "parameter",
"displayName": "Service Bus Transaction Context", "group": "producer", "label":
"producer", "required": false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Represents [...]
"lazyStartProducer": { "index": 23, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
"connectionString": { "index": 24, "kind": "parameter", "displayName":
"Connection String", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": true, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the connection
string for a Service Bus namespace or a specific Service Bus [...]
diff --git
a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
index 7d471464963..fd9d5458740 100644
---
a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusComponentConfigurer.java
@@ -68,8 +68,8 @@ public class ServiceBusComponentConfigurer extends
PropertyConfigurerSupport imp
case "proxyOptions":
getOrCreateConfiguration(target).setProxyOptions(property(camelContext,
com.azure.core.amqp.ProxyOptions.class, value)); return true;
case "scheduledenqueuetime":
case "scheduledEnqueueTime":
getOrCreateConfiguration(target).setScheduledEnqueueTime(property(camelContext,
java.time.OffsetDateTime.class, value)); return true;
- case "senderasyncclient":
- case "senderAsyncClient":
getOrCreateConfiguration(target).setSenderAsyncClient(property(camelContext,
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.class, value));
return true;
+ case "senderclient":
+ case "senderClient":
getOrCreateConfiguration(target).setSenderClient(property(camelContext,
com.azure.messaging.servicebus.ServiceBusSenderClient.class, value)); return
true;
case "servicebusreceivemode":
case "serviceBusReceiveMode":
getOrCreateConfiguration(target).setServiceBusReceiveMode(property(camelContext,
com.azure.messaging.servicebus.models.ServiceBusReceiveMode.class, value));
return true;
case "servicebustransactioncontext":
@@ -88,7 +88,7 @@ public class ServiceBusComponentConfigurer extends
PropertyConfigurerSupport imp
@Override
public String[] getAutowiredNames() {
- return new String[]{"processorClient", "senderAsyncClient"};
+ return new String[]{"processorClient", "senderClient"};
}
@Override
@@ -132,8 +132,8 @@ public class ServiceBusComponentConfigurer extends
PropertyConfigurerSupport imp
case "proxyOptions": return com.azure.core.amqp.ProxyOptions.class;
case "scheduledenqueuetime":
case "scheduledEnqueueTime": return java.time.OffsetDateTime.class;
- case "senderasyncclient":
- case "senderAsyncClient": return
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.class;
+ case "senderclient":
+ case "senderClient": return
com.azure.messaging.servicebus.ServiceBusSenderClient.class;
case "servicebusreceivemode":
case "serviceBusReceiveMode": return
com.azure.messaging.servicebus.models.ServiceBusReceiveMode.class;
case "servicebustransactioncontext":
@@ -192,8 +192,8 @@ public class ServiceBusComponentConfigurer extends
PropertyConfigurerSupport imp
case "proxyOptions": return
getOrCreateConfiguration(target).getProxyOptions();
case "scheduledenqueuetime":
case "scheduledEnqueueTime": return
getOrCreateConfiguration(target).getScheduledEnqueueTime();
- case "senderasyncclient":
- case "senderAsyncClient": return
getOrCreateConfiguration(target).getSenderAsyncClient();
+ case "senderclient":
+ case "senderClient": return
getOrCreateConfiguration(target).getSenderClient();
case "servicebusreceivemode":
case "serviceBusReceiveMode": return
getOrCreateConfiguration(target).getServiceBusReceiveMode();
case "servicebustransactioncontext":
diff --git
a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
index 66958cf64c8..e0b08731033 100644
---
a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointConfigurer.java
@@ -62,8 +62,8 @@ public class ServiceBusEndpointConfigurer extends
PropertyConfigurerSupport impl
case "proxyOptions":
target.getConfiguration().setProxyOptions(property(camelContext,
com.azure.core.amqp.ProxyOptions.class, value)); return true;
case "scheduledenqueuetime":
case "scheduledEnqueueTime":
target.getConfiguration().setScheduledEnqueueTime(property(camelContext,
java.time.OffsetDateTime.class, value)); return true;
- case "senderasyncclient":
- case "senderAsyncClient":
target.getConfiguration().setSenderAsyncClient(property(camelContext,
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.class, value));
return true;
+ case "senderclient":
+ case "senderClient":
target.getConfiguration().setSenderClient(property(camelContext,
com.azure.messaging.servicebus.ServiceBusSenderClient.class, value)); return
true;
case "servicebusreceivemode":
case "serviceBusReceiveMode":
target.getConfiguration().setServiceBusReceiveMode(property(camelContext,
com.azure.messaging.servicebus.models.ServiceBusReceiveMode.class, value));
return true;
case "servicebustransactioncontext":
@@ -82,7 +82,7 @@ public class ServiceBusEndpointConfigurer extends
PropertyConfigurerSupport impl
@Override
public String[] getAutowiredNames() {
- return new String[]{"processorClient", "senderAsyncClient"};
+ return new String[]{"processorClient", "senderClient"};
}
@Override
@@ -127,8 +127,8 @@ public class ServiceBusEndpointConfigurer extends
PropertyConfigurerSupport impl
case "proxyOptions": return com.azure.core.amqp.ProxyOptions.class;
case "scheduledenqueuetime":
case "scheduledEnqueueTime": return java.time.OffsetDateTime.class;
- case "senderasyncclient":
- case "senderAsyncClient": return
com.azure.messaging.servicebus.ServiceBusSenderAsyncClient.class;
+ case "senderclient":
+ case "senderClient": return
com.azure.messaging.servicebus.ServiceBusSenderClient.class;
case "servicebusreceivemode":
case "serviceBusReceiveMode": return
com.azure.messaging.servicebus.models.ServiceBusReceiveMode.class;
case "servicebustransactioncontext":
@@ -188,8 +188,8 @@ public class ServiceBusEndpointConfigurer extends
PropertyConfigurerSupport impl
case "proxyOptions": return
target.getConfiguration().getProxyOptions();
case "scheduledenqueuetime":
case "scheduledEnqueueTime": return
target.getConfiguration().getScheduledEnqueueTime();
- case "senderasyncclient":
- case "senderAsyncClient": return
target.getConfiguration().getSenderAsyncClient();
+ case "senderclient":
+ case "senderClient": return
target.getConfiguration().getSenderClient();
case "servicebusreceivemode":
case "serviceBusReceiveMode": return
target.getConfiguration().getServiceBusReceiveMode();
case "servicebustransactioncontext":
diff --git
a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
index d3a6b51d89b..6a60680f4cb 100644
---
a/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
+++
b/components/camel-azure/camel-azure-servicebus/src/generated/java/org/apache/camel/component/azure/servicebus/ServiceBusEndpointUriFactory.java
@@ -44,7 +44,7 @@ public class ServiceBusEndpointUriFactory extends
org.apache.camel.support.compo
props.add("producerOperation");
props.add("proxyOptions");
props.add("scheduledEnqueueTime");
- props.add("senderAsyncClient");
+ props.add("senderClient");
props.add("serviceBusReceiveMode");
props.add("serviceBusTransactionContext");
props.add("serviceBusType");
diff --git
a/components/camel-azure/camel-azure-servicebus/src/generated/resources/META-INF/org/apache/camel/component/azure/servicebus/azure-servicebus.json
b/components/camel-azure/camel-azure-servicebus/src/generated/resources/META-INF/org/apache/camel/component/azure/servicebus/azure-servicebus.json
index 9192954a44b..b44b54aeefb 100644
---
a/components/camel-azure/camel-azure-servicebus/src/generated/resources/META-INF/org/apache/camel/component/azure/servicebus/azure-servicebus.json
+++
b/components/camel-azure/camel-azure-servicebus/src/generated/resources/META-INF/org/apache/camel/component/azure/servicebus/azure-servicebus.json
@@ -28,7 +28,7 @@
"clientOptions": { "index": 2, "kind": "property", "displayName": "Client
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.util.ClientOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the ClientOptions
to be sent from the client built from this builder, [...]
"configuration": { "index": 3, "kind": "property", "displayName":
"Configuration", "group": "common", "label": "", "required": false, "type":
"object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"deprecated": false, "autowired": false, "secret": false, "description": "The
component configurations" },
"headerFilterStrategy": { "index": 4, "kind": "property", "displayName":
"Header Filter Strategy", "group": "common", "label": "common", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "To use a custom
HeaderFilterStrategy to filter Serv [...]
- "proxyOptions": { "index": 5, "kind": "property", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderAsyncClient. When a p [...]
+ "proxyOptions": { "index": 5, "kind": "property", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderClient. When a proxy [...]
"serviceBusType": { "index": 6, "kind": "property", "displayName":
"Service Bus Type", "group": "common", "label": "common", "required": true,
"type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusType", "enum": [
"queue", "topic" ], "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": "queue", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "c [...]
"bridgeErrorHandler": { "index": 7, "kind": "property", "displayName":
"Bridge Error Handler", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Allows for bridging the consumer to the Camel routing Error Handler, which
mean any exceptions (if possible) occurred while the Camel consumer is trying
to pickup incoming messages, or the like [...]
"enableDeadLettering": { "index": 8, "kind": "property", "displayName":
"Enable Dead Lettering", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Enable application level
deadlettering to the subscription [...]
@@ -43,7 +43,7 @@
"lazyStartProducer": { "index": 17, "kind": "property", "displayName":
"Lazy Start Producer", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether the producer should be started lazy (on the first message). By
starting lazy you can use this to allow CamelContext and routes to startup in
situations where a producer may otherwise fai [...]
"producerOperation": { "index": 18, "kind": "property", "displayName":
"Producer Operation", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition",
"enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "sendMessages",
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConf [...]
"scheduledEnqueueTime": { "index": 19, "kind": "property", "displayName":
"Scheduled Enqueue Time", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets OffsetDateTime at
which the message should appear in the S [...]
- "senderAsyncClient": { "index": 20, "kind": "property", "displayName":
"Sender Async Client", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated":
false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets SenderAs [...]
+ "senderClient": { "index": 20, "kind": "property", "displayName": "Sender
Client", "group": "producer", "label": "producer", "required": false, "type":
"object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderClient",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets senderClient to be
used [...]
"serviceBusTransactionContext": { "index": 21, "kind": "property",
"displayName": "Service Bus Transaction Context", "group": "producer", "label":
"producer", "required": false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Represents [...]
"autowiredEnabled": { "index": 22, "kind": "property", "displayName":
"Autowired Enabled", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether autowiring is enabled. This is used for automatic autowiring options
(the option must be marked as autowired) by looking up in the registry to find
if there is a single instance of matching [...]
"connectionString": { "index": 23, "kind": "property", "displayName":
"Connection String", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": true, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the connection
string for a Service Bus namespace or a specific Service Bus [...]
@@ -84,7 +84,7 @@
"amqpTransportType": { "index": 2, "kind": "parameter", "displayName":
"Amqp Transport Type", "group": "common", "label": "common", "required": false,
"type": "object", "javaType": "com.azure.core.amqp.AmqpTransportType", "enum":
[ "Amqp", "AmqpWebSockets" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "AMQP", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": [...]
"clientOptions": { "index": 3, "kind": "parameter", "displayName": "Client
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.util.ClientOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the ClientOptions
to be sent from the client built from this builder [...]
"headerFilterStrategy": { "index": 4, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "common", "label": "common", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "To use a custom
HeaderFilterStrategy to filter Ser [...]
- "proxyOptions": { "index": 5, "kind": "parameter", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderAsyncClient. When a [...]
+ "proxyOptions": { "index": 5, "kind": "parameter", "displayName": "Proxy
Options", "group": "common", "label": "common", "required": false, "type":
"object", "javaType": "com.azure.core.amqp.ProxyOptions", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the proxy
configuration to use for ServiceBusSenderClient. When a proxy [...]
"serviceBusType": { "index": 6, "kind": "parameter", "displayName":
"Service Bus Type", "group": "common", "label": "common", "required": true,
"type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusType", "enum": [
"queue", "topic" ], "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": "queue", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": " [...]
"enableDeadLettering": { "index": 7, "kind": "parameter", "displayName":
"Enable Dead Lettering", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Enable application level
deadlettering to the subscription [...]
"maxAutoLockRenewDuration": { "index": 8, "kind": "parameter",
"displayName": "Max Auto Lock Renew Duration", "group": "consumer", "label":
"consumer", "required": false, "type": "object", "javaType":
"java.time.Duration", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "5m", "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the amount of time
to continue a [...]
@@ -100,7 +100,7 @@
"binary": { "index": 18, "kind": "parameter", "displayName": "Binary",
"group": "producer", "label": "producer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Set binary mode. If
true, message body will be sent as byte. By default, it is false." },
"producerOperation": { "index": 19, "kind": "parameter", "displayName":
"Producer Operation", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition",
"enum": [ "sendMessages", "scheduleMessages" ], "deprecated": false,
"autowired": false, "secret": false, "defaultValue": "sendMessages",
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusCon [...]
"scheduledEnqueueTime": { "index": 20, "kind": "parameter", "displayName":
"Scheduled Enqueue Time", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType": "java.time.OffsetDateTime", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets OffsetDateTime at
which the message should appear in the [...]
- "senderAsyncClient": { "index": 21, "kind": "parameter", "displayName":
"Sender Async Client", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusSenderAsyncClient", "deprecated":
false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets SenderA [...]
+ "senderClient": { "index": 21, "kind": "parameter", "displayName": "Sender
Client", "group": "producer", "label": "producer", "required": false, "type":
"object", "javaType": "com.azure.messaging.servicebus.ServiceBusSenderClient",
"deprecated": false, "deprecationNote": "", "autowired": true, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets senderClient to be
used [...]
"serviceBusTransactionContext": { "index": 22, "kind": "parameter",
"displayName": "Service Bus Transaction Context", "group": "producer", "label":
"producer", "required": false, "type": "object", "javaType":
"com.azure.messaging.servicebus.ServiceBusTransactionContext", "deprecated":
false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Represents [...]
"lazyStartProducer": { "index": 23, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
"connectionString": { "index": 24, "kind": "parameter", "displayName":
"Connection String", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": true, "configurationClass":
"org.apache.camel.component.azure.servicebus.ServiceBusConfiguration",
"configurationField": "configuration", "description": "Sets the connection
string for a Service Bus namespace or a specific Service Bus [...]
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
b/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
index 238f9ea976e..c48ae2fd4d6 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
+++
b/components/camel-azure/camel-azure-servicebus/src/main/docs/azure-servicebus-component.adoc
@@ -49,11 +49,9 @@ include::partial$component-endpoint-options.adoc[]
// endpoint options: END
-== Async Consumer and Producer
+== Consumer and Producer
-This component implements the async Consumer and producer.
-
-This allows camel route to consume and produce events asynchronously without
blocking any threads.
+This component implements the Consumer and Producer.
== Usage
@@ -82,7 +80,7 @@ See the documentation
https://docs.microsoft.com/en-us/azure/active-directory/au
*Client instance*:
-- You can provide a
`com.azure.messaging.servicebus.ServiceBusSenderAsyncClient` for sending
message and/or `com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient`
to receive messages. If you provide the instances, they will be autowired.
+- You can provide a `com.azure.messaging.servicebus.ServiceBusSenderClient`
for sending message and/or
`com.azure.messaging.servicebus.ServiceBusReceiverClient` to receive messages.
If you provide the instances, they will be autowired.
=== Message Body
In the producer, this component accepts message body of `String`, `byte[]` and
`BinaryData` types or `List<String>`, `List<byte[]>` and `List<BinaryData>` to
send batch messages.
@@ -156,15 +154,6 @@ from("azure-servicebus:test//?connectionString=test")
.to("mock:result");
--------------------------------------------------------------------------------
-- `peekMessages`
-
-[source,java]
---------------------------------------------------------------------------------
-from("azure-servicebus:test//?connectionString=test&consumerOperation=peekMessages&peekNumMaxMessages=3")
- .log("${body}")
- .to("mock:result");
---------------------------------------------------------------------------------
-
include::spring-boot:partial$starter.adoc[]
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
index b7d161bd8a4..8957b1abda7 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusComponent.java
@@ -88,7 +88,7 @@ public class ServiceBusComponent extends DefaultComponent {
}
private void validateConfigurations(final ServiceBusConfiguration
configuration) {
- if (configuration.getProcessorClient() == null ||
configuration.getSenderAsyncClient() == null) {
+ if (configuration.getProcessorClient() == null ||
configuration.getSenderClient() == null) {
if (ObjectHelper.isEmpty(configuration.getConnectionString()) &&
ObjectHelper.isEmpty(configuration.getFullyQualifiedNamespace())) {
throw new IllegalArgumentException(
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
index b25bcebf9ed..a0ddec8bc11 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
@@ -25,7 +25,7 @@ import com.azure.core.amqp.ProxyOptions;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.ClientOptions;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
-import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
+import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
@@ -80,7 +80,7 @@ public class ServiceBusConfiguration implements Cloneable,
HeaderFilterStrategyA
private ServiceBusProducerOperationDefinition producerOperation =
ServiceBusProducerOperationDefinition.sendMessages;
@UriParam(label = "producer")
@Metadata(autowired = true)
- private ServiceBusSenderAsyncClient senderAsyncClient;
+ private ServiceBusSenderClient senderClient;
@UriParam(label = "producer")
private ServiceBusTransactionContext serviceBusTransactionContext;
@UriParam(label = "producer")
@@ -151,7 +151,7 @@ public class ServiceBusConfiguration implements Cloneable,
HeaderFilterStrategyA
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
When a proxy is configured, AMQP_WEB_SOCKETS
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When a
proxy is configured, AMQP_WEB_SOCKETS
* must be used for the transport type.
*/
public ProxyOptions getProxyOptions() {
@@ -280,14 +280,14 @@ public class ServiceBusConfiguration implements
Cloneable, HeaderFilterStrategyA
}
/**
- * Sets SenderAsyncClient to be used in the producer.
+ * Sets senderClient to be used in the producer.
*/
- public ServiceBusSenderAsyncClient getSenderAsyncClient() {
- return senderAsyncClient;
+ public ServiceBusSenderClient getSenderClient() {
+ return senderClient;
}
- public void setSenderAsyncClient(ServiceBusSenderAsyncClient
senderAsyncClient) {
- this.senderAsyncClient = senderAsyncClient;
+ public void setSenderClient(ServiceBusSenderClient senderClient) {
+ this.senderClient = senderClient;
}
/**
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
index 3c2d2912ccd..7b17cad8f6b 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
@@ -23,29 +23,26 @@ import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.azure.core.util.BinaryData;
-import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
-import org.apache.camel.*;
-import
org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
+import com.azure.messaging.servicebus.ServiceBusSenderClient;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
import
org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Mono;
-public class ServiceBusProducer extends DefaultAsyncProducer {
+public class ServiceBusProducer extends DefaultProducer {
- private static final Logger LOG =
LoggerFactory.getLogger(ServiceBusProducer.class);
- private final Map<ServiceBusProducerOperationDefinition,
BiConsumer<Exchange, AsyncCallback>> operationsToExecute
+ private final Map<ServiceBusProducerOperationDefinition,
Consumer<Exchange>> operationsToExecute
= new EnumMap<>(ServiceBusProducerOperationDefinition.class);
- private ServiceBusSenderAsyncClientWrapper senderClientWrapper;
+ private ServiceBusSenderClient client;
private ServiceBusConfigurationOptionsProxy configurationOptionsProxy;
private ServiceBusSenderOperations serviceBusSenderOperations;
@@ -69,35 +66,41 @@ public class ServiceBusProducer extends
DefaultAsyncProducer {
super.doStart();
// create the senderClient
- final ServiceBusSenderAsyncClient senderClient =
getConfiguration().getSenderAsyncClient() != null
- ? getConfiguration().getSenderAsyncClient()
- :
getEndpoint().getServiceBusClientFactory().createServiceBusSenderAsyncClient(getConfiguration());
-
- // create the wrapper
- senderClientWrapper = new
ServiceBusSenderAsyncClientWrapper(senderClient);
+ client = getConfiguration().getSenderClient() != null
+ ? getConfiguration().getSenderClient()
+ :
getEndpoint().getServiceBusClientFactory().createServiceBusSenderClient(getConfiguration());
// create the operations
- serviceBusSenderOperations = new
ServiceBusSenderOperations(senderClientWrapper);
+ serviceBusSenderOperations = new ServiceBusSenderOperations(client);
}
@Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- try {
-
invokeOperation(configurationOptionsProxy.getServiceBusProducerOperationDefinition(exchange),
exchange, callback);
- return false;
- } catch (Exception e) {
- exchange.setException(e);
- callback.done(true);
- return true;
+ public void process(Exchange exchange) {
+ final ServiceBusProducerOperationDefinition operation
+ =
configurationOptionsProxy.getServiceBusProducerOperationDefinition(exchange);
+ final ServiceBusProducerOperationDefinition operationsToInvoke;
+
+ // we put sendMessage operation as default in case no operation has
been selected
+ if (ObjectHelper.isEmpty(operation)) {
+ operationsToInvoke =
ServiceBusProducerOperationDefinition.sendMessages;
+ } else {
+ operationsToInvoke = operation;
}
+ final Consumer<Exchange> fnToInvoke =
operationsToExecute.get(operationsToInvoke);
+
+ if (fnToInvoke != null) {
+ fnToInvoke.accept(exchange);
+ } else {
+ throw new RuntimeCamelException("Operation not supported. Value: "
+ operationsToInvoke);
+ }
}
@Override
protected void doStop() throws Exception {
- if (senderClientWrapper != null) {
- // shutdown async client
- senderClientWrapper.close();
+ if (client != null) {
+ // shutdown client
+ client.close();
}
super.doStop();
@@ -112,36 +115,13 @@ public class ServiceBusProducer extends
DefaultAsyncProducer {
return getEndpoint().getConfiguration();
}
- private void bind(ServiceBusProducerOperationDefinition operation,
BiConsumer<Exchange, AsyncCallback> fn) {
+ private void bind(ServiceBusProducerOperationDefinition operation,
Consumer<Exchange> fn) {
operationsToExecute.put(operation, fn);
}
- /**
- * Entry method that selects the appropriate
ServiceBusProducerOperationDefinition operation and executes it
- */
- private void invokeOperation(
- final ServiceBusProducerOperationDefinition operation, final
Exchange exchange, final AsyncCallback callback) {
- final ServiceBusProducerOperationDefinition operationsToInvoke;
-
- // we put sendMessage operation as default in case no operation has
been selected
- if (ObjectHelper.isEmpty(operation)) {
- operationsToInvoke =
ServiceBusProducerOperationDefinition.sendMessages;
- } else {
- operationsToInvoke = operation;
- }
-
- final BiConsumer<Exchange, AsyncCallback> fnToInvoke =
operationsToExecute.get(operationsToInvoke);
-
- if (fnToInvoke != null) {
- fnToInvoke.accept(exchange, callback);
- } else {
- throw new RuntimeCamelException("Operation not supported. Value: "
+ operationsToInvoke);
- }
- }
-
@SuppressWarnings("unchecked")
- private BiConsumer<Exchange, AsyncCallback> sendMessages() {
- return (exchange, callback) -> {
+ private Consumer<Exchange> sendMessages() {
+ return (exchange) -> {
final Object inputBody = exchange.getMessage().getBody();
Map<String, Object> applicationProperties
=
exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES,
Map.class);
@@ -151,31 +131,25 @@ public class ServiceBusProducer extends
DefaultAsyncProducer {
propagateHeaders(exchange, applicationProperties);
final String correlationId =
exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID,
String.class);
- Mono<Void> sendMessageAsync;
-
if (inputBody instanceof Iterable<?>) {
- sendMessageAsync
- =
serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>)
inputBody),
-
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties,
- correlationId);
+
serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>)
inputBody),
+
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties,
+ correlationId);
} else {
Object convertedBody = inputBody instanceof BinaryData ?
inputBody
: getConfiguration().isBinary() ?
convertBodyToBinary(exchange)
: exchange.getMessage().getBody(String.class);
- sendMessageAsync =
serviceBusSenderOperations.sendMessages(convertedBody,
+ serviceBusSenderOperations.sendMessages(convertedBody,
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties,
correlationId);
}
-
- subscribeToMono(sendMessageAsync, exchange, noop -> {
- }, callback);
};
}
@SuppressWarnings("unchecked")
- private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
- return (exchange, callback) -> {
+ private Consumer<Exchange> scheduleMessages() {
+ return (exchange) -> {
final Object inputBody = exchange.getMessage().getBody();
Map<String, Object> applicationProperties
=
exchange.getMessage().getHeader(ServiceBusConstants.APPLICATION_PROPERTIES,
Map.class);
@@ -185,36 +159,27 @@ public class ServiceBusProducer extends
DefaultAsyncProducer {
propagateHeaders(exchange, applicationProperties);
final String correlationId =
exchange.getMessage().getHeader(ServiceBusConstants.CORRELATION_ID,
String.class);
- Mono<List<Long>> scheduleMessagesAsync;
-
if (inputBody instanceof Iterable<?>) {
- scheduleMessagesAsync
- =
serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>)
inputBody),
-
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
-
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
- applicationProperties,
- correlationId);
+
serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>)
inputBody),
+
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
+
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
+ applicationProperties,
+ correlationId);
} else {
Object convertedBody = inputBody instanceof BinaryData ?
inputBody
: getConfiguration().isBinary() ?
convertBodyToBinary(exchange)
: exchange.getMessage().getBody(String.class);
- scheduleMessagesAsync
- =
serviceBusSenderOperations.scheduleMessages(convertedBody,
-
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
-
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
- applicationProperties,
- correlationId);
+ serviceBusSenderOperations.scheduleMessages(convertedBody,
+
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
+
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
+ applicationProperties,
+ correlationId);
}
-
- subscribeToMono(scheduleMessagesAsync, exchange,
- sequenceNumbers ->
exchange.getMessage().setBody(sequenceNumbers), callback);
};
}
private List<?> convertBodyToList(final Iterable<?> inputBody) {
- return StreamSupport.stream(inputBody.spliterator(), false)
- .map(this::convertMessageBody)
- .toList();
+ return StreamSupport.stream(inputBody.spliterator(),
false).map(this::convertMessageBody).toList();
}
private Object convertBodyToBinary(Exchange exchange) {
@@ -257,21 +222,4 @@ public class ServiceBusProducer extends
DefaultAsyncProducer {
exchange))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
}
-
- private <T> void subscribeToMono(
- final Mono<T> inputMono, final Exchange exchange, final
Consumer<T> resultsCallback, final AsyncCallback callback) {
- inputMono
- .subscribe(resultsCallback, error -> {
- // error but we continue
- if (LOG.isDebugEnabled()) {
- LOG.debug("Error processing async exchange with error:
{}", error.getMessage());
- }
- exchange.setException(error);
- callback.done(false);
- }, () -> {
- // we are done from everything, so mark it as sync done
- LOG.trace("All events with exchange have been sent
successfully.");
- callback.done(false);
- });
- }
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
index 81bd3947319..25fc699ebcd 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusClientFactory.java
@@ -79,9 +79,9 @@ public final class ServiceBusClientFactory {
return processorClientBuilder;
}
- public ServiceBusSenderAsyncClient createServiceBusSenderAsyncClient(final
ServiceBusConfiguration configuration) {
+ public ServiceBusSenderClient createServiceBusSenderClient(final
ServiceBusConfiguration configuration) {
return
createBaseServiceBusSenderClient(createBaseServiceBusClient(configuration),
configuration)
- .buildAsyncClient();
+ .buildClient();
}
public ServiceBusProcessorClient createServiceBusProcessorClient(
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusSenderAsyncClientWrapper.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusSenderAsyncClientWrapper.java
deleted file mode 100644
index be34176aa58..00000000000
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/client/ServiceBusSenderAsyncClientWrapper.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.azure.servicebus.client;
-
-import java.time.OffsetDateTime;
-
-import com.azure.messaging.servicebus.ServiceBusMessage;
-import com.azure.messaging.servicebus.ServiceBusMessageBatch;
-import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
-import com.azure.messaging.servicebus.ServiceBusTransactionContext;
-import com.azure.messaging.servicebus.models.CreateMessageBatchOptions;
-import org.apache.camel.util.ObjectHelper;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public class ServiceBusSenderAsyncClientWrapper {
-
- private final ServiceBusSenderAsyncClient client;
-
- public ServiceBusSenderAsyncClientWrapper(final
ServiceBusSenderAsyncClient client) {
- ObjectHelper.isNotEmpty(client);
-
- this.client = client;
- }
-
- public String getFullyQualifiedNamespace() {
- return client.getFullyQualifiedNamespace();
- }
-
- public String getEntityPath() {
- return client.getEntityPath();
- }
-
- public Mono<Void> sendMessage(ServiceBusMessage message) {
- return client.sendMessage(message);
- }
-
- public Mono<Void> sendMessage(
- ServiceBusMessage message,
- ServiceBusTransactionContext transactionContext) {
- return client.sendMessage(message, transactionContext);
- }
-
- public Mono<Void> sendMessages(
- Iterable<ServiceBusMessage> messages,
- ServiceBusTransactionContext transactionContext) {
- return client.sendMessages(messages, transactionContext);
- }
-
- public Mono<Void> sendMessages(Iterable<ServiceBusMessage> messages) {
- return client.sendMessages(messages);
- }
-
- public Mono<Void> sendMessages(ServiceBusMessageBatch batch) {
- return client.sendMessages(batch);
- }
-
- public Mono<Void> sendMessages(
- ServiceBusMessageBatch batch,
- ServiceBusTransactionContext transactionContext) {
- return client.sendMessages(batch, transactionContext);
- }
-
- public Mono<ServiceBusMessageBatch> createMessageBatch() {
- return client.createMessageBatch();
- }
-
- public Mono<ServiceBusMessageBatch> createMessageBatch(
- CreateMessageBatchOptions options) {
- return client.createMessageBatch(options);
- }
-
- public Mono<Long> scheduleMessage(
- ServiceBusMessage message, OffsetDateTime scheduledEnqueueTime,
- ServiceBusTransactionContext transactionContext) {
- return client.scheduleMessage(message, scheduledEnqueueTime,
transactionContext);
- }
-
- public Mono<Long> scheduleMessage(ServiceBusMessage message,
OffsetDateTime scheduledEnqueueTime) {
- return client.scheduleMessage(message, scheduledEnqueueTime);
- }
-
- public Flux<Long> scheduleMessages(
- Iterable<ServiceBusMessage> messages,
- OffsetDateTime scheduledEnqueueTime) {
- return client.scheduleMessages(messages, scheduledEnqueueTime);
- }
-
- public Flux<Long> scheduleMessages(
- Iterable<ServiceBusMessage> messages,
- OffsetDateTime scheduledEnqueueTime,
- ServiceBusTransactionContext transactionContext) {
- return client.scheduleMessages(messages, scheduledEnqueueTime,
transactionContext);
- }
-
- public Mono<Void> cancelScheduledMessage(long sequenceNumber) {
- return client.cancelScheduledMessage(sequenceNumber);
- }
-
- public Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers) {
- return client.cancelScheduledMessages(sequenceNumbers);
- }
-
- public Mono<ServiceBusTransactionContext> createTransaction() {
- return client.createTransaction();
- }
-
- public Mono<Void> commitTransaction(ServiceBusTransactionContext
transactionContext) {
- return client.commitTransaction(transactionContext);
- }
-
- public Mono<Void> rollbackTransaction(ServiceBusTransactionContext
transactionContext) {
- return client.rollbackTransaction(transactionContext);
- }
-
- public void close() {
- client.close();
- }
-}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
index c299e9cad32..436f8c5fdad 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/operations/ServiceBusSenderOperations.java
@@ -20,37 +20,37 @@ import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.StreamSupport;
import com.azure.messaging.servicebus.ServiceBusMessage;
+import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import org.apache.camel.component.azure.servicebus.ServiceBusUtils;
-import
org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import org.apache.camel.util.ObjectHelper;
-import reactor.core.publisher.Mono;
public class ServiceBusSenderOperations {
- private final ServiceBusSenderAsyncClientWrapper client;
+ private final ServiceBusSenderClient client;
- public ServiceBusSenderOperations(ServiceBusSenderAsyncClientWrapper
client) {
+ public ServiceBusSenderOperations(ServiceBusSenderClient client) {
ObjectHelper.notNull(client, "client");
this.client = client;
}
- public Mono<Void> sendMessages(
+ public void sendMessages(
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties,
final String correlationId) {
if (data instanceof Iterable<?>) {
- return sendMessages((Iterable<?>) data, context,
applicationProperties, correlationId);
+ sendMessages((Iterable<?>) data, context, applicationProperties,
correlationId);
+ } else {
+ sendMessage(data, context, applicationProperties, correlationId);
}
-
- return sendMessage(data, context, applicationProperties,
correlationId);
}
- public Mono<List<Long>> scheduleMessages(
+ public List<Long> scheduleMessages(
final Object data,
final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
@@ -67,7 +67,7 @@ public class ServiceBusSenderOperations {
return scheduleMessage(data, scheduledEnqueueTime, context,
applicationProperties, correlationId);
}
- private Mono<Void> sendMessages(
+ private void sendMessages(
final Iterable<?> data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties,
@@ -76,13 +76,13 @@ public class ServiceBusSenderOperations {
= ServiceBusUtils.createServiceBusMessages(data,
applicationProperties, correlationId);
if (ObjectHelper.isEmpty(context)) {
- return client.sendMessages(messages);
+ client.sendMessages(messages);
+ } else {
+ client.sendMessages(messages, context);
}
-
- return client.sendMessages(messages, context);
}
- private Mono<Void> sendMessage(
+ private void sendMessage(
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties,
@@ -90,13 +90,13 @@ public class ServiceBusSenderOperations {
final ServiceBusMessage message =
ServiceBusUtils.createServiceBusMessage(data, applicationProperties,
correlationId);
if (ObjectHelper.isEmpty(context)) {
- return client.sendMessage(message);
+ client.sendMessage(message);
+ } else {
+ client.sendMessage(message, context);
}
-
- return client.sendMessage(message, context);
}
- private Mono<List<Long>> scheduleMessage(
+ private List<Long> scheduleMessage(
final Object data,
final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
@@ -105,15 +105,13 @@ public class ServiceBusSenderOperations {
final ServiceBusMessage message =
ServiceBusUtils.createServiceBusMessage(data, applicationProperties,
correlationId);
if (ObjectHelper.isEmpty(context)) {
- return client.scheduleMessage(message, scheduledEnqueueTime)
- .map(Collections::singletonList);
+ return Collections.singletonList(client.scheduleMessage(message,
scheduledEnqueueTime));
}
- return client.scheduleMessage(message, scheduledEnqueueTime, context)
- .map(Collections::singletonList);
+ return Collections.singletonList(client.scheduleMessage(message,
scheduledEnqueueTime, context));
}
- private Mono<List<Long>> scheduleMessages(
+ private List<Long> scheduleMessages(
final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties,
@@ -122,11 +120,9 @@ public class ServiceBusSenderOperations {
= ServiceBusUtils.createServiceBusMessages(data,
applicationProperties, correlationId);
if (ObjectHelper.isEmpty(context)) {
- return client.scheduleMessages(messages, scheduledEnqueueTime)
- .collectList();
+ return StreamSupport.stream(client.scheduleMessages(messages,
scheduledEnqueueTime).spliterator(), false).toList();
}
- return client.scheduleMessages(messages, scheduledEnqueueTime, context)
- .collectList();
+ return StreamSupport.stream(client.scheduleMessages(messages,
scheduledEnqueueTime, context).spliterator(), false).toList();
}
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
index 2de9ed1b146..4e495f36c57 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/integration/operations/ServiceBusSenderOperationsTest.java
@@ -24,9 +24,8 @@ import java.util.concurrent.TimeUnit;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
-import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
+import com.azure.messaging.servicebus.ServiceBusSenderClient;
import org.apache.camel.builder.RouteBuilder;
-import
org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import
org.apache.camel.component.azure.servicebus.integration.BaseServiceBusTestSupport;
import
org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
import org.junit.jupiter.api.AfterAll;
@@ -39,22 +38,21 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@EnabledIfSystemProperty(named =
BaseServiceBusTestSupport.CONNECTION_STRING_PROPERTY_NAME, matches = ".*",
- disabledReason = "Service Bus connection string must
be supplied to run this test, e.g: mvn verify -D"
- +
BaseServiceBusTestSupport.CONNECTION_STRING_PROPERTY_NAME + "=connectionString")
+ disabledReason = "Service Bus connection string must be supplied to
run this test, e.g: mvn verify -D"
+ + BaseServiceBusTestSupport.CONNECTION_STRING_PROPERTY_NAME +
"=connectionString")
public class ServiceBusSenderOperationsTest extends BaseServiceBusTestSupport {
- private static ServiceBusSenderAsyncClientWrapper clientSenderWrapper;
+ private static ServiceBusSenderClient client;
@BeforeAll
static void prepare() {
- final ServiceBusSenderAsyncClient senderClient = new
ServiceBusClientBuilder().connectionString(CONNECTION_STRING)
- .sender().topicName(TOPIC_NAME).buildAsyncClient();
- clientSenderWrapper = new
ServiceBusSenderAsyncClientWrapper(senderClient);
+ client = new
ServiceBusClientBuilder().connectionString(CONNECTION_STRING)
+ .sender().topicName(TOPIC_NAME).buildClient();
}
@AfterAll
static void closeClient() {
- clientSenderWrapper.close();
+ client.close();
}
@BeforeEach
@@ -64,15 +62,15 @@ public class ServiceBusSenderOperationsTest extends
BaseServiceBusTestSupport {
@Test
void testSendSingleMessage() throws InterruptedException {
- final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(clientSenderWrapper);
+ final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(client);
messageLatch = new CountDownLatch(2);
try (ServiceBusProcessorClient processorClient =
createTopicProcessorClient()) {
processorClient.start();
- operations.sendMessages("test data", null, Map.of("customKey",
"customValue"), null).block();
+ operations.sendMessages("test data", null, Map.of("customKey",
"customValue"), null);
//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
- operations.sendMessages(testByteBody, null, Map.of("customKey",
"customValue"), null).block();
+ operations.sendMessages(testByteBody, null, Map.of("customKey",
"customValue"), null);
assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS));
@@ -87,13 +85,13 @@ public class ServiceBusSenderOperationsTest extends
BaseServiceBusTestSupport {
// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
- operations.sendMessages(12345, null, null, null).block();
+ operations.sendMessages(12345, null, null, null);
});
}
@Test
void testSendingBatchMessages() throws InterruptedException {
- final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(clientSenderWrapper);
+ final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(client);
messageLatch = new CountDownLatch(5);
try (ServiceBusProcessorClient processorClient =
createTopicProcessorClient()) {
@@ -104,14 +102,14 @@ public class ServiceBusSenderOperationsTest extends
BaseServiceBusTestSupport {
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");
- operations.sendMessages(inputBatch, null, null, null).block();
+ operations.sendMessages(inputBatch, null, null, null);
//test bytes
final List<byte[]> inputBatch2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);
- operations.sendMessages(inputBatch2, null, null, null).block();
+ operations.sendMessages(inputBatch2, null, null, null);
assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS));
@@ -138,16 +136,16 @@ public class ServiceBusSenderOperationsTest extends
BaseServiceBusTestSupport {
@Test
void testScheduleMessage() throws InterruptedException {
- final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(clientSenderWrapper);
+ final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(client);
messageLatch = new CountDownLatch(2);
try (ServiceBusProcessorClient processorClient =
createTopicProcessorClient()) {
processorClient.start();
- operations.scheduleMessages("testScheduleMessage",
OffsetDateTime.now(), null, null, null).block();
+ operations.scheduleMessages("testScheduleMessage",
OffsetDateTime.now(), null, null, null);
//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
- operations.scheduleMessages(testByteBody, OffsetDateTime.now(),
null, null, null).block();
+ operations.scheduleMessages(testByteBody, OffsetDateTime.now(),
null, null, null);
assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS));
@@ -161,13 +159,13 @@ public class ServiceBusSenderOperationsTest extends
BaseServiceBusTestSupport {
}
// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
- operations.scheduleMessages(12345, OffsetDateTime.now(), null,
null, null).block();
+ operations.scheduleMessages(12345, OffsetDateTime.now(), null,
null, null);
});
}
@Test
void testSchedulingBatchMessages() throws InterruptedException {
- final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(clientSenderWrapper);
+ final ServiceBusSenderOperations operations = new
ServiceBusSenderOperations(client);
messageLatch = new CountDownLatch(5);
try (ServiceBusProcessorClient processorClient =
createTopicProcessorClient()) {
@@ -177,14 +175,14 @@ public class ServiceBusSenderOperationsTest extends
BaseServiceBusTestSupport {
inputBatch.add("testSchedulingBatchMessages 1");
inputBatch.add("testSchedulingBatchMessages 2");
inputBatch.add("testSchedulingBatchMessages 3");
- operations.scheduleMessages(inputBatch, OffsetDateTime.now(),
null, null, null).block();
+ operations.scheduleMessages(inputBatch, OffsetDateTime.now(),
null, null, null);
//test bytes
final List<byte[]> inputBatch2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);
- operations.scheduleMessages(inputBatch2, OffsetDateTime.now(),
null, null, null).block();
+ operations.scheduleMessages(inputBatch2, OffsetDateTime.now(),
null, null, null);
assertTrue(messageLatch.await(3000, TimeUnit.MILLISECONDS));
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureServicebusComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureServicebusComponentBuilderFactory.java
index 271bc9e99ed..33df60137e8 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureServicebusComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/AzureServicebusComponentBuilderFactory.java
@@ -139,8 +139,8 @@ public interface AzureServicebusComponentBuilderFactory {
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option is a:
@@ -426,18 +426,18 @@ public interface AzureServicebusComponentBuilderFactory {
}
/**
- * Sets SenderAsyncClient to be used in the producer.
+ * Sets senderClient to be used in the producer.
*
* The option is a:
- *
<code>com.azure.messaging.servicebus.ServiceBusSenderAsyncClient</code>
type.
+ *
<code>com.azure.messaging.servicebus.ServiceBusSenderClient</code>
type.
*
* Group: producer
*
- * @param senderAsyncClient the value to set
+ * @param senderClient the value to set
* @return the dsl builder
*/
- default AzureServicebusComponentBuilder
senderAsyncClient(com.azure.messaging.servicebus.ServiceBusSenderAsyncClient
senderAsyncClient) {
- doSetProperty("senderAsyncClient", senderAsyncClient);
+ default AzureServicebusComponentBuilder
senderClient(com.azure.messaging.servicebus.ServiceBusSenderClient
senderClient) {
+ doSetProperty("senderClient", senderClient);
return this;
}
@@ -586,7 +586,7 @@ public interface AzureServicebusComponentBuilderFactory {
case "lazyStartProducer": ((ServiceBusComponent)
component).setLazyStartProducer((boolean) value); return true;
case "producerOperation":
getOrCreateConfiguration((ServiceBusComponent)
component).setProducerOperation((org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition)
value); return true;
case "scheduledEnqueueTime":
getOrCreateConfiguration((ServiceBusComponent)
component).setScheduledEnqueueTime((java.time.OffsetDateTime) value); return
true;
- case "senderAsyncClient":
getOrCreateConfiguration((ServiceBusComponent)
component).setSenderAsyncClient((com.azure.messaging.servicebus.ServiceBusSenderAsyncClient)
value); return true;
+ case "senderClient":
getOrCreateConfiguration((ServiceBusComponent)
component).setSenderClient((com.azure.messaging.servicebus.ServiceBusSenderClient)
value); return true;
case "serviceBusTransactionContext":
getOrCreateConfiguration((ServiceBusComponent)
component).setServiceBusTransactionContext((com.azure.messaging.servicebus.ServiceBusTransactionContext)
value); return true;
case "autowiredEnabled": ((ServiceBusComponent)
component).setAutowiredEnabled((boolean) value); return true;
case "connectionString":
getOrCreateConfiguration((ServiceBusComponent)
component).setConnectionString((java.lang.String) value); return true;
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ServiceBusEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ServiceBusEndpointBuilderFactory.java
index 2059e6faaad..94d03c04d00 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ServiceBusEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/ServiceBusEndpointBuilderFactory.java
@@ -175,8 +175,8 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option is a: <code>com.azure.core.amqp.ProxyOptions</code> type.
@@ -191,8 +191,8 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option will be converted to a
@@ -856,8 +856,8 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option is a: <code>com.azure.core.amqp.ProxyOptions</code> type.
@@ -872,8 +872,8 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option will be converted to a
@@ -1020,33 +1020,35 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets SenderAsyncClient to be used in the producer.
+ * Sets senderClient to be used in the producer.
*
* The option is a:
- *
<code>com.azure.messaging.servicebus.ServiceBusSenderAsyncClient</code> type.
+ * <code>com.azure.messaging.servicebus.ServiceBusSenderClient</code>
+ * type.
*
* Group: producer
*
- * @param senderAsyncClient the value to set
+ * @param senderClient the value to set
* @return the dsl builder
*/
- default ServiceBusEndpointProducerBuilder
senderAsyncClient(com.azure.messaging.servicebus.ServiceBusSenderAsyncClient
senderAsyncClient) {
- doSetProperty("senderAsyncClient", senderAsyncClient);
+ default ServiceBusEndpointProducerBuilder
senderClient(com.azure.messaging.servicebus.ServiceBusSenderClient
senderClient) {
+ doSetProperty("senderClient", senderClient);
return this;
}
/**
- * Sets SenderAsyncClient to be used in the producer.
+ * Sets senderClient to be used in the producer.
*
* The option will be converted to a
- *
<code>com.azure.messaging.servicebus.ServiceBusSenderAsyncClient</code> type.
+ * <code>com.azure.messaging.servicebus.ServiceBusSenderClient</code>
+ * type.
*
* Group: producer
*
- * @param senderAsyncClient the value to set
+ * @param senderClient the value to set
* @return the dsl builder
*/
- default ServiceBusEndpointProducerBuilder senderAsyncClient(String
senderAsyncClient) {
- doSetProperty("senderAsyncClient", senderAsyncClient);
+ default ServiceBusEndpointProducerBuilder senderClient(String
senderClient) {
+ doSetProperty("senderClient", senderClient);
return this;
}
/**
@@ -1373,8 +1375,8 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option is a: <code>com.azure.core.amqp.ProxyOptions</code> type.
@@ -1389,8 +1391,8 @@ public interface ServiceBusEndpointBuilderFactory {
return this;
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient.
- * When a proxy is configured, AMQP_WEB_SOCKETS must be used for the
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When
+ * a proxy is configured, AMQP_WEB_SOCKETS must be used for the
* transport type.
*
* The option will be converted to a
diff --git
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/AzureServicebusUriDsl.kt
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/AzureServicebusUriDsl.kt
index 8e1d5010676..3f0103e635b 100644
---
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/AzureServicebusUriDsl.kt
+++
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/AzureServicebusUriDsl.kt
@@ -86,8 +86,8 @@ public class AzureServicebusUriDsl(
}
/**
- * Sets the proxy configuration to use for ServiceBusSenderAsyncClient. When
a proxy is
- * configured, AMQP_WEB_SOCKETS must be used for the transport type.
+ * Sets the proxy configuration to use for ServiceBusSenderClient. When a
proxy is configured,
+ * AMQP_WEB_SOCKETS must be used for the transport type.
*/
public fun proxyOptions(proxyOptions: String) {
it.property("proxyOptions", proxyOptions)
@@ -266,10 +266,10 @@ public class AzureServicebusUriDsl(
}
/**
- * Sets SenderAsyncClient to be used in the producer.
+ * Sets senderClient to be used in the producer.
*/
- public fun senderAsyncClient(senderAsyncClient: String) {
- it.property("senderAsyncClient", senderAsyncClient)
+ public fun senderClient(senderClient: String) {
+ it.property("senderClient", senderClient)
}
/**