This is an automated email from the ASF dual-hosted git repository.
oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 9780179 CAMEL-15517: Support iterable of events in Azure EventHubs
component (#4179)
9780179 is described below
commit 97801790ece61614555b7b32d57a8c345134c530
Author: Omar Al-Safi <[email protected]>
AuthorDate: Wed Sep 9 13:50:54 2020 +0200
CAMEL-15517: Support iterable of events in Azure EventHubs component (#4179)
---
.../component/azure/eventhubs/azure-eventhubs.json | 2 +-
.../src/main/docs/azure-eventhubs-component.adoc | 16 +++-
.../operations/EventHubsProducerOperations.java | 48 +++++++++-
.../operations/EventHubsProducerOperationsIT.java | 105 ++++++++++++++++++++-
4 files changed, 161 insertions(+), 10 deletions(-)
diff --git
a/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
b/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
index b564024..7836480 100644
---
a/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
+++
b/components/camel-azure-eventhubs/src/generated/resources/org/apache/camel/component/azure/eventhubs/azure-eventhubs.json
@@ -40,7 +40,7 @@
"partitionId": { "kind": "property", "displayName": "Partition Id",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration",
"configurationField": "configuration", "description": "Sets the identifier of
the Event Hub partition that the {link EventData events} will be sent to. If
the identifier is not spe [...]
"partitionKey": { "kind": "property", "displayName": "Partition Key",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration",
"configurationField": "configuration", "description": "Sets a hashing key to be
provided for the batch of events, which instructs the Event Hubs service to map
this key to a spec [...]
"producerAsyncClient": { "kind": "property", "displayName": "Producer
Async Client", "group": "producer", "label": "producer", "required": false,
"type": "object", "javaType":
"com.azure.messaging.eventhubs.EventHubProducerAsyncClient", "deprecated":
false, "secret": false, "configurationClass":
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration",
"configurationField": "configuration", "description": "Sets the
EventHubProducerAsyncClient.An asynchronous producer respo [...]
- "basicPropertyBinding": { "kind": "property", "displayName": "Basic
Property Binding", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": true, "secret": false,
"defaultValue": false, "description": "Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities" },
+ "basicPropertyBinding": { "kind": "property", "displayName": "Basic
Property Binding", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities" },
"connectionString": { "kind": "property", "displayName": "Connection
String", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": true,
"configurationClass":
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration",
"configurationField": "configuration", "description": "Instead of supplying
namespace, sharedAccessKey, sharedAccessName ... etc, you can just supply the
connection string [...]
"sharedAccessKey": { "kind": "property", "displayName": "Shared Access
Key", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": true,
"configurationClass":
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration",
"configurationField": "configuration", "description": "The generated value for
the SharedAccessName" },
"sharedAccessName": { "kind": "property", "displayName": "Shared Access
Name", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "secret": false,
"configurationClass":
"org.apache.camel.component.azure.eventhubs.EventHubsConfiguration",
"configurationField": "configuration", "description": "The name you chose for
your EventHubs SAS keys" }
diff --git
a/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
b/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
index ce08321..ca0c6cb 100644
---
a/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
+++
b/components/camel-azure-eventhubs/src/main/docs/azure-eventhubs-component.adoc
@@ -133,7 +133,7 @@ The Azure Event Hubs component supports 21 options, which
are listed below.
| *partitionId* (producer) | Sets the identifier of the Event Hub partition
that the {link EventData events} will be sent to. If the identifier is not
specified, the Event Hubs service will be responsible for routing events that
are sent to an available partition. | | String
| *partitionKey* (producer) | Sets a hashing key to be provided for the batch
of events, which instructs the Event Hubs service to map this key to a specific
partition. The selection of a partition is stable for a given partition hashing
key. Should any other batches of events be sent using the same exact partition
hashing key, the Event Hubs service will route them all to the same partition.
This should be specified only when there is a need to group events by
partition, but there is fl [...]
| *producerAsyncClient* (producer) | Sets the EventHubProducerAsyncClient.An
asynchronous producer responsible for transmitting EventData to a specific
Event Hub, grouped together in batches. Depending on the {link
CreateBatchOptions options} specified when creating an \{linkEventDataBatch\},
the events may be automatically routed to an available partition or specific to
a partition. Use by this component to produce the data in camel producer. | |
EventHubProducerAsyncClient
-| *basicPropertyBinding* (advanced) | *Deprecated* Whether the component
should use basic property binding (Camel 2.x) or the newer property binding
with additional capabilities | false | boolean
+| *basicPropertyBinding* (advanced) | Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities | false | boolean
| *connectionString* (security) | Instead of supplying namespace,
sharedAccessKey, sharedAccessName ... etc, you can just supply the connection
string for your eventHub. The connection string for EventHubs already include
all the necessary information to connection to your EventHub. To learn on how
to generate the connection string, take a look at this documentation:
\https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string
| | String
| *sharedAccessKey* (security) | The generated value for the SharedAccessName
| | String
| *sharedAccessName* (security) | The name you chose for your EventHubs SAS
keys | | String
@@ -228,6 +228,20 @@ from("direct:start")
.to("azure-eventhubs:?connectionString=RAW({{connectionString}})"
```
+Also, the component supports as well *aggregation* of messages by sending
events as *iterable* of either Exchanges/Messages or normal data (e.g: list of
Strings). For example:
+```
+from("direct:start")
+.process(exchange -> {
+ final List<String> messages = new LinkedList<>();
+ messages.add("Test String Message 1");
+ messages.add("Test String Message 2");
+
+ exchange.getIn().setHeader(EventHubsConstants.PARTITION_ID,
firstPartition);
+ exchange.getIn().setBody(messages);
+})
+.to("azure-eventhubs:?connectionString=RAW({{connectionString}})"
+```
+
=== Development Notes (Important)
When developing on this component, you will need to obtain your Azure
accessKey in order to run the integration tests. In addition to the mocked unit
tests
you *will need to run the integration tests with every change you make or even
client upgrade as the Azure client can break things even on minor versions
upgrade.*
diff --git
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
index 9183f03..4037ba2 100644
---
a/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
+++
b/components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
@@ -17,12 +17,16 @@
package org.apache.camel.component.azure.eventhubs.operations;
import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
import
org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
import org.apache.camel.util.ObjectHelper;
@@ -97,16 +101,52 @@ public class EventHubsProducerOperations {
.setPartitionKey(partitionKey);
}
+ @SuppressWarnings("unchecked")
private Iterable<EventData> createEventData(final Exchange exchange) {
- final byte[] data = exchange.getIn().getBody(byte[].class);
+ // check if our exchange is list or contain some values
+ if (exchange.getIn().getBody() instanceof Iterable) {
+ return createEventDataFromIterable((Iterable<Object>)
exchange.getIn().getBody(),
+ exchange.getContext().getTypeConverter());
+ }
+
+ // we have only a single event here
+ return
Collections.singletonList(createEventDataFromExchange(exchange));
+ }
+
+ private Iterable<EventData> createEventDataFromIterable(final
Iterable<Object> inputData, final TypeConverter converter) {
+ final List<EventData> finalEventData = new LinkedList<>();
+
+ inputData.forEach(data -> {
+ if (data instanceof Exchange) {
+ finalEventData.add(createEventDataFromExchange((Exchange)
data));
+ } else if (data instanceof Message) {
+ finalEventData.add(createEventDataFromMessage((Message) data));
+ } else {
+ finalEventData.add(createEventDataFromObject(data, converter));
+ }
+ });
+
+ return finalEventData;
+ }
+
+ private EventData createEventDataFromExchange(final Exchange exchange) {
+ return createEventDataFromMessage(exchange.getIn());
+ }
+
+ private EventData createEventDataFromMessage(final Message message) {
+ return createEventDataFromObject(message.getBody(),
message.getExchange().getContext().getTypeConverter());
+ }
+
+ private EventData createEventDataFromObject(final Object inputData, final
TypeConverter converter) {
+ final byte[] data = converter.convertTo(byte[].class, inputData);
if (ObjectHelper.isEmpty(data)) {
throw new IllegalArgumentException(
String.format("Cannot convert message body %s to byte[].
You will need "
+ "to make sure the data encoded in byte[]
or add a Camel TypeConverter to convert the data to byte[]",
- exchange.getIn().getBody()));
+ inputData));
}
- // for now we only support single event
- return Collections.singletonList(new EventData(data));
+
+ return new EventData(data);
}
}
diff --git
a/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
b/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
index 03e09f0..e5d7960 100644
---
a/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
+++
b/components/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperationsIT.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.azure.eventhubs.operations;
import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -32,6 +34,7 @@ import
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -40,6 +43,8 @@ import org.junit.jupiter.api.TestInstance;
class EventHubsProducerOperationsIT extends CamelTestSupport {
private EventHubsConfiguration configuration;
+ private EventHubProducerAsyncClient producerAsyncClient;
+ private EventHubConsumerAsyncClient consumerAsyncClient;
@BeforeAll
public void prepare() throws Exception {
@@ -48,15 +53,14 @@ class EventHubsProducerOperationsIT extends
CamelTestSupport {
configuration = new EventHubsConfiguration();
configuration.setConnectionString(properties.getProperty("connectionString"));
configuration.setConsumerGroupName(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME);
+
+ producerAsyncClient =
EventHubsClientFactory.createEventHubProducerAsyncClient(configuration);
+ consumerAsyncClient =
EventHubsClientFactory.createEventHubConsumerAsyncClient(configuration);
}
@Test
public void testSendEventWithSpecificPartition() {
- final EventHubProducerAsyncClient producerAsyncClient
- =
EventHubsClientFactory.createEventHubProducerAsyncClient(configuration);
final EventHubsProducerOperations operations = new
EventHubsProducerOperations(producerAsyncClient, configuration);
- final EventHubConsumerAsyncClient consumerAsyncClient
- =
EventHubsClientFactory.createEventHubConsumerAsyncClient(configuration);
final String firstPartition =
producerAsyncClient.getPartitionIds().blockLast();
final Exchange exchange = new DefaultExchange(context);
@@ -84,7 +88,100 @@ class EventHubsProducerOperationsIT extends
CamelTestSupport {
return eventExists;
});
+ }
+
+ @Test
+ public void testIterableExchangesSendEventsWithSpecificPartition() {
+ final EventHubsProducerOperations operations = new
EventHubsProducerOperations(producerAsyncClient, configuration);
+ final String firstPartition =
producerAsyncClient.getPartitionIds().blockLast();
+
+ final Exchange exchange1 = new DefaultExchange(context);
+ final Exchange exchange2 = new DefaultExchange(context);
+
+ exchange1.getIn().setBody("Exchange Message 1");
+ exchange2.getIn().setBody("Exchange Message 2");
+
+ final List<Exchange> exchanges = new LinkedList<>();
+ exchanges.add(exchange1);
+ exchanges.add(exchange2);
+
+ final Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody(exchanges);
+
+ operations.sendEvents(exchange, doneSync -> {
+ });
+
+ Awaitility.await()
+ .atMost(40, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(2))
+ .pollInterval(Duration.ofSeconds(2))
+ .until(() -> {
+ final Boolean event1Exists = consumerAsyncClient
+ .receiveFromPartition(firstPartition,
EventPosition.earliest())
+ .any(partitionEvent ->
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+ &&
partitionEvent.getData().getBodyAsString()
+ .contains("Exchange Message 1"))
+ .block();
+
+ final Boolean event2Exists = consumerAsyncClient
+ .receiveFromPartition(firstPartition,
EventPosition.earliest())
+ .any(partitionEvent ->
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+ &&
partitionEvent.getData().getBodyAsString()
+ .contains("Exchange Message 2"))
+ .block();
+
+ if (event1Exists == null || event2Exists == null) {
+ return false;
+ }
+
+ return event1Exists && event2Exists;
+ });
+ }
+
+ @Test
+ public void testIterableStringSendEventsWithSpecificPartition() {
+ final EventHubsProducerOperations operations = new
EventHubsProducerOperations(producerAsyncClient, configuration);
+ final String firstPartition =
producerAsyncClient.getPartitionIds().blockLast();
+
+ final List<String> messages = new LinkedList<>();
+ messages.add("Test String Message 1");
+ messages.add("Test String Message 2");
+
+ final Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody(messages);
+
+ operations.sendEvents(exchange, doneSync -> {
+ });
+
+ Awaitility.await()
+ .atMost(40, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(2))
+ .pollInterval(Duration.ofSeconds(2))
+ .until(() -> {
+ final Boolean event1Exists = consumerAsyncClient
+ .receiveFromPartition(firstPartition,
EventPosition.earliest())
+ .any(partitionEvent ->
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+ &&
partitionEvent.getData().getBodyAsString()
+ .contains("Test String Message 1"))
+ .block();
+
+ final Boolean event2Exists = consumerAsyncClient
+ .receiveFromPartition(firstPartition,
EventPosition.earliest())
+ .any(partitionEvent ->
partitionEvent.getPartitionContext().getPartitionId().equals(firstPartition)
+ &&
partitionEvent.getData().getBodyAsString()
+ .contains("Test String Message 2"))
+ .block();
+
+ if (event1Exists == null || event2Exists == null) {
+ return false;
+ }
+
+ return event1Exists && event2Exists;
+ });
+ }
+ @AfterAll
+ public void tearDown() {
producerAsyncClient.close();
consumerAsyncClient.close();
}