This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 7e623033f3 Increase azure-eventhubs test coverage
7e623033f3 is described below
commit 7e623033f3a582a1f49251df1f0fe50cb40798dc
Author: James Netherton <[email protected]>
AuthorDate: Thu Aug 22 07:12:58 2024 +0100
Increase azure-eventhubs test coverage
Fixes #6367
---
.../azure/azure-eventhubs/pom.xml | 25 +-
.../azure/eventhubs/it/AzureCredentialsHelper.java | 88 +++++
.../eventhubs/it/AzureEventhubsProducers.java | 57 +++
.../azure/eventhubs/it/AzureEventhubsResource.java | 120 ++++--
.../azure/eventhubs/it/AzureEventhubsRoutes.java | 192 +++++++++-
.../eventhubs/it/InMemoryCheckpointStore.java | 134 +++++++
.../src/main/resources/application.properties | 22 ++
.../azure/eventhubs/it/AzureEventhubsIT.java | 2 +-
.../azure/eventhubs/it/AzureEventhubsTest.java | 405 ++++++++++++++++++++-
integration-test-groups/azure/azure-resources.sh | 2 +-
10 files changed, 976 insertions(+), 71 deletions(-)
diff --git a/integration-test-groups/azure/azure-eventhubs/pom.xml
b/integration-test-groups/azure/azure-eventhubs/pom.xml
index 35015cc397..099adb9a07 100644
--- a/integration-test-groups/azure/azure-eventhubs/pom.xml
+++ b/integration-test-groups/azure/azure-eventhubs/pom.xml
@@ -33,15 +33,15 @@
<dependencies>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-azure-eventhubs</artifactId>
+ <artifactId>camel-quarkus-direct</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-mock</artifactId>
+ <artifactId>camel-quarkus-azure-eventhubs</artifactId>
</dependency>
<dependency>
- <groupId>io.quarkus</groupId>
- <artifactId>quarkus-quartz</artifactId>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-mock</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
@@ -51,6 +51,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jsonb</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-integration-test-support</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -107,6 +111,19 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is
built after them. You can update them by running `mvn process-resources
-Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-azure-eventhubs-deployment</artifactId>
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureCredentialsHelper.java
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureCredentialsHelper.java
new file mode 100644
index 0000000000..df9687c78a
--- /dev/null
+++
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureCredentialsHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.azure.eventhubs.it;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.azure.core.amqp.implementation.ConnectionStringProperties;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+
+public final class AzureCredentialsHelper {
+ private AzureCredentialsHelper() {
+ // Utility class
+ }
+
+ public static boolean isMinimumConfigurationAvailable() {
+ Config config = ConfigProvider.getConfig();
+ if (isMockBackEnd()) {
+ return false;
+ }
+ Optional<String> storageAccountName =
config.getOptionalValue("azure.storage.account-name", String.class);
+ Optional<String> storageAccountKey =
config.getOptionalValue("azure.storage.account-key", String.class);
+ Optional<String> connectionString =
config.getOptionalValue("azure.event.hubs.connection.string", String.class);
+ return storageAccountName.isPresent() && storageAccountKey.isPresent()
&& connectionString.isPresent();
+ }
+
+ public static boolean isAzureIdentityCredentialsAvailable() {
+ Config config = ConfigProvider.getConfig();
+ if (isMockBackEnd()) {
+ return false;
+ }
+
+ Optional<String> clientId = config.getOptionalValue("azure.client.id",
String.class);
+ Optional<String> tenantId = config.getOptionalValue("azure.tenant.id",
String.class);
+ Optional<String> username = config.getOptionalValue("azure.username",
String.class);
+ Optional<String> password = config.getOptionalValue("azure.password",
String.class);
+ Optional<String> clientSecret =
config.getOptionalValue("azure.client.secret", String.class);
+ Optional<String> clientCertificate =
config.getOptionalValue("azure.client.certificate.path", String.class);
+ Optional<String> clientCertificatePassword =
config.getOptionalValue("azure.client.certificate.password", String.class);
+ return (clientId.isPresent() && tenantId.isPresent() &&
+ (username.isPresent() || password.isPresent() ||
clientCertificate.isPresent() || clientSecret.isPresent()
+ || clientCertificatePassword.isPresent()));
+ }
+
+ public static boolean isSharedAccessKeyAvailable() {
+ Config config = ConfigProvider.getConfig();
+ if (isMockBackEnd()) {
+ return false;
+ }
+ return config.getOptionalValue("azure.event.hubs.shared.access.name",
String.class).isPresent()
+ &&
config.getOptionalValue("azure.event.hubs.shared.access.key",
String.class).isPresent();
+ }
+
+ public static boolean isMockBackEnd() {
+ Config config = ConfigProvider.getConfig();
+ return config.getOptionalValue("camel.quarkus.start.mock.backend",
Boolean.class).orElse(true);
+ }
+
+ public static Map<String, String> parseConnectionString(String
connectionString) {
+ Map<String, String> properties = new HashMap<>();
+ ConnectionStringProperties stringProperties = new
ConnectionStringProperties(connectionString);
+ properties.put("Endpoint", stringProperties.getEndpoint().toString());
+ properties.put("EntityPath", stringProperties.getEntityPath());
+ properties.put("SharedAccessKey",
stringProperties.getSharedAccessKeyName());
+ properties.put("SharedAccessKeyValue",
stringProperties.getSharedAccessKey());
+
+ String host = stringProperties.getEndpoint().getHost();
+ properties.put("Namespace", host.substring(0, host.indexOf('.')));
+
+ return properties;
+ }
+}
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsProducers.java
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsProducers.java
new file mode 100644
index 0000000000..82830c6dbd
--- /dev/null
+++
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsProducers.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.azure.eventhubs.it;
+
+import java.util.Optional;
+
+import com.azure.core.amqp.implementation.ConnectionStringProperties;
+import com.azure.core.credential.TokenCredential;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.implementation.ClientConstants;
+import
com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
+import jakarta.inject.Named;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+public class AzureEventhubsProducers {
+ @ConfigProperty(name = "azure.event.hubs.connection.string")
+ Optional<String> connectionString;
+
+ @Named("connectionStringTokenCredential")
+ TokenCredential tokenCredential() {
+ if (connectionString.isPresent()) {
+ ConnectionStringProperties properties = new
ConnectionStringProperties(connectionString.get());
+ TokenCredential tokenCredential;
+ if (properties.getSharedAccessSignature() == null) {
+ tokenCredential = new
EventHubSharedKeyCredential(properties.getSharedAccessKeyName(),
+ properties.getSharedAccessKey(),
ClientConstants.TOKEN_VALIDITY);
+ } else {
+ tokenCredential = new
EventHubSharedKeyCredential(properties.getSharedAccessSignature());
+ }
+ return tokenCredential;
+ }
+ return null;
+ }
+
+ @Named("eventHubClient")
+ EventHubProducerAsyncClient eventHubClient() {
+ return connectionString.map(connection -> new EventHubClientBuilder()
+ .connectionString(connection)
+ .buildAsyncProducerClient())
+ .orElse(null);
+ }
+}
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
index 740d24f9cf..c69dd53710 100644
---
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
+++
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsResource.java
@@ -17,77 +17,133 @@
package org.apache.camel.quarkus.component.azure.eventhubs.it;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
-import io.quarkus.scheduler.Scheduled;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.camel.CamelContext;
-import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.azure.eventhubs.EventHubsConstants;
import org.apache.camel.component.mock.MockEndpoint;
-import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.logging.Logger;
@Path("/azure-eventhubs")
@ApplicationScoped
public class AzureEventhubsResource {
+ private static final Logger LOG =
Logger.getLogger(AzureEventhubsResource.class);
@Inject
ProducerTemplate producerTemplate;
- @Inject
- ConsumerTemplate consumerTemplate;
-
@Inject
CamelContext context;
- @ConfigProperty(name = "azure.event.hubs.connection.string")
- Optional<String> connectionString;
-
- private volatile String message;
- private int counter = 0;
-
- /**
- * For some reason if we send just a single message, it is not always
received by the consumer.
- * Sending multiple messages seems to be more reliable.
- */
- @Scheduled(every = "1s")
- void schedule() {
- if (message != null) {
- final String endpointUri =
"azure-eventhubs:?connectionString=RAW(" + connectionString.get() + ")";
- producerTemplate.sendBody(endpointUri, message + (counter++));
+ @Path("/receive-event")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Object> receiveEvent(@QueryParam("endpointUri") String
endpointUri, String match) {
+ final MockEndpoint mockEndpoint = context.getEndpoint(endpointUri,
MockEndpoint.class);
+ List<Exchange> receivedExchanges = mockEndpoint.getReceivedExchanges();
+
+ Optional<Exchange> optionalExchange = receivedExchanges.stream()
+ .filter(exchange ->
exchange.getMessage().getBody(String.class).equals(match))
+ .findFirst();
+
+ if (optionalExchange.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Exchange exchange = optionalExchange.get();
+ Message message = exchange.getMessage();
+ return Map.of(
+ "body", message.getBody(String.class),
+ "headers", message.getHeaders());
+ }
+
+ @Path("/send-event/{partitionId}")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response sendEvent(
+ @PathParam("partitionId") String partitionId,
+ @QueryParam("endpointUri") String endpointUri,
+ String message) throws Exception {
+
+ if (ObjectHelper.isEmpty(endpointUri)) {
+ endpointUri = "direct:sendEvent";
}
+
+ LOG.infof("Producing event to endpoint uri: %s", endpointUri);
+
+ producerTemplate.sendBodyAndHeader(endpointUri, message,
EventHubsConstants.PARTITION_ID, partitionId);
+ return Response.created(new URI("https://camel.apache.org/")).build();
}
@Path("/receive-events")
@GET
@Produces(MediaType.APPLICATION_JSON)
- public List<String> receiveEvents() throws Exception {
+ public List<Map<String, Object>> receiveEvents(@QueryParam("endpointUri")
String endpointUri, List<String> matches) {
+ final MockEndpoint mockEndpoint = context.getEndpoint(endpointUri,
MockEndpoint.class);
+ List<Exchange> receivedExchanges = mockEndpoint.getReceivedExchanges();
+
+ List<Exchange> exchanges = receivedExchanges.stream()
+ .filter(exchange ->
matches.contains(exchange.getMessage().getBody(String.class)))
+ .toList();
+
+ if (exchanges.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Map<String, Object>> result = new ArrayList<>();
+ for (Exchange exchange : exchanges) {
+ Message message = exchange.getMessage();
+ result.add(Map.of(
+ "body", message.getBody(String.class),
+ "headers", message.getHeaders()));
+ }
- final MockEndpoint mockEndpoint =
context.getEndpoint("mock:azure-consumed", MockEndpoint.class);
- return mockEndpoint.getReceivedExchanges().stream()
- .map(Exchange::getMessage)
- .map(m -> m.getBody(String.class))
- .collect(Collectors.toList());
+ return result;
}
- @Path("/send-events")
+ @Path("/send-events/{partitionId}")
@POST
+ @Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
- @Consumes(MediaType.TEXT_PLAIN)
- public Response sendEvents(String body) throws Exception {
- this.message = body; // start sending the messages via schedule()
+ public Response sendEvents(@PathParam("partitionId") String partitionId,
List<String> messages) throws Exception {
+ producerTemplate.sendBodyAndHeader("direct:sendEvent", messages,
EventHubsConstants.PARTITION_ID, partitionId);
return Response.created(new URI("https://camel.apache.org/")).build();
}
+ @Path("/route/{routeId}/start")
+ @POST
+ public void startRoute(@PathParam("routeId") String routeId) throws
Exception {
+ LOG.infof("Starting route: %s", routeId);
+ context.getRouteController().startRoute(routeId);
+ // A random jitter value is applied in the Event Hubs client before
its message listener is active.
+ // In addition, claiming ownership of partitions seems to take an
indeterminate amount of time.
+ // Therefore, we need to wait until it's safe to produce events
+ Thread.sleep(5000);
+ }
+
+ @Path("/route/{routeId}/stop")
+ @POST
+ public void stopRoute(@PathParam("routeId") String routeId) throws
Exception {
+ context.getRouteController().stopRoute(routeId);
+ }
}
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
index 3a787c8478..d9b611a61a 100644
---
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
+++
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsRoutes.java
@@ -16,15 +16,21 @@
*/
package org.apache.camel.quarkus.component.azure.eventhubs.it;
+import java.util.Map;
import java.util.Optional;
import com.azure.core.amqp.AmqpTransportType;
+import com.azure.core.credential.TokenCredential;
+import com.azure.messaging.eventhubs.models.EventPosition;
import jakarta.enterprise.context.ApplicationScoped;
-import org.apache.camel.builder.RouteBuilder;
+import jakarta.inject.Inject;
+import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
+import org.apache.camel.component.azure.eventhubs.CredentialType;
+import org.apache.camel.quarkus.test.mock.backend.MockBackendUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
-public class AzureEventhubsRoutes extends RouteBuilder {
+public class AzureEventhubsRoutes extends EndpointRouteBuilder {
@ConfigProperty(name = "azure.storage.account-name")
String azureStorageAccountName;
@@ -38,22 +44,178 @@ public class AzureEventhubsRoutes extends RouteBuilder {
@ConfigProperty(name = "azure.event.hubs.blob.container.name")
Optional<String> azureBlobContainerName;
- @ConfigProperty(name = "camel.quarkus.start.mock.backend", defaultValue =
"true")
- boolean startMockBackend;
+ @Inject
+ TokenCredential tokenCredential;
@Override
- public void configure() throws Exception {
- if (connectionString.isPresent() &&
azureBlobContainerName.isPresent()) {
- from("azure-eventhubs:?connectionString=RAW(" +
connectionString.get()
- + ")&blobAccountName=RAW(" + azureStorageAccountName
- + ")&blobAccessKey=RAW(" + azureStorageAccountKey
- + ")&blobContainerName=RAW(" +
azureBlobContainerName.get() + ")&amqpTransportType="
- + AmqpTransportType.AMQP)
- .to("mock:azure-consumed");
- } else if (!startMockBackend) {
+ public void configure() {
+ if (!MockBackendUtils.startMockBackend() &&
!AzureCredentialsHelper.isMinimumConfigurationAvailable()) {
throw new IllegalStateException(
- "azure.event.hubs.connection.string and
azure.event.hubs.blob.container.name must be set when
camel.quarkus.start.mock.backend == false");
+ "Configuration properties
azure.event.hubs.connection.string, azure.event.hubs.blob.container.name &
azure.storage.account-key must be set when camel.quarkus.start.mock.backend ==
false");
}
- }
+ if (AzureCredentialsHelper.isMinimumConfigurationAvailable()) {
+ Map<String, String> connectionProperties =
AzureCredentialsHelper.parseConnectionString(connectionString.get());
+ String eventHubsPath =
"%s/%s".formatted(connectionProperties.get("Namespace"),
+ connectionProperties.get("EntityPath"));
+
+ // Consumes EventHub messages and routes them based on which
partition they are associated with
+ from(azureEventhubs("")
+ .connectionString(connectionString.get())
+ .blobAccountName(azureStorageAccountName)
+ .blobAccessKey(azureStorageAccountKey)
+ .blobContainerName(azureBlobContainerName.get()))
+ .routeId("eventhubs-consumer")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId}")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
0"))
+ .to("mock:partition-0-results")
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
1"))
+ .to("mock:partition-1-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ // Consumes events from partition 2 with InMemoryCheckpointStore
+ from(azureEventhubs("")
+ .connectionString(connectionString.get())
+ .checkpointStore(new InMemoryCheckpointStore()))
+ .routeId("eventhubs-consumer-custom-checkpoint-store")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId} with InMemoryCheckpointStore")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
2"))
+ .to("mock:partition-2-initial-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ // Reads all events sent to partition 2 from the beginning
+ from(azureEventhubs("")
+ .connectionString(connectionString.get())
+ .checkpointStore(new InMemoryCheckpointStore())
+ .eventPosition(Map.of("2", EventPosition.earliest())))
+ .routeId("eventhubs-consumer-with-event-position")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId} from EventPosition.earliest")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
2"))
+ .to("mock:partition-2-event-position-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ // Consumes events from partition 3 using a custom TokenCredential
+ from(azureEventhubs(eventHubsPath)
+ .credentialType(CredentialType.TOKEN_CREDENTIAL)
+ .tokenCredential(tokenCredential)
+ .blobAccountName(azureStorageAccountName)
+ .blobAccessKey(azureStorageAccountKey)
+ .blobContainerName(azureBlobContainerName.get()))
+ .routeId("eventhubs-consumer-custom-token-credential")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId} with TokenCredential")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
3"))
+ .to("mock:partition-3-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ // Consumes events from partition 4 using WS transport
+ from(azureEventhubs("")
+ .connectionString(connectionString.get())
+ .blobAccountName(azureStorageAccountName)
+ .blobAccessKey(azureStorageAccountKey)
+ .blobContainerName(azureBlobContainerName.get())
+ .amqpTransportType(AmqpTransportType.AMQP_WEB_SOCKETS))
+ .routeId("eventhubs-consumer-with-amqp-ws-transport")
+ .autoStartup(false)
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
4"))
+ .to("mock:partition-4-ws-transport-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ from("direct:sendEvent")
+ .to(azureEventhubs("")
+ .connectionString(connectionString.get()));
+
+ from("direct:sendEventUsingAmqpWebSockets")
+ .to(azureEventhubs("")
+ .connectionString(connectionString.get())
+
.amqpTransportType(AmqpTransportType.AMQP_WEB_SOCKETS));
+
+ from("direct:sendEventUsingTokenCredential")
+ .to(azureEventhubs(eventHubsPath)
+ .credentialType(CredentialType.TOKEN_CREDENTIAL)
+ .tokenCredential(tokenCredential));
+
+ // Consumes EventHub messages that are produced by the custom
client in direct:sendEventUsingCustomClient
+ from(azureEventhubs("")
+ .connectionString(connectionString.get())
+ .blobAccountName(azureStorageAccountName)
+ .blobAccessKey(azureStorageAccountKey)
+ .blobContainerName(azureBlobContainerName.get()))
+ .routeId("eventhubs-consumer-for-custom-client")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId}")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
0"))
+ .to("mock:partition-0-custom-client-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ from("direct:sendEventUsingCustomClient")
+ .to(azureEventhubs("")
+ .producerAsyncClient("#eventHubClient"));
+
+ // Consumes using an auto-generated connection string from the
shared access configuration
+ from(azureEventhubs(eventHubsPath)
+
.sharedAccessName(connectionProperties.get("SharedAccessKey"))
+
.sharedAccessKey(connectionProperties.get("SharedAccessKeyValue"))
+ .blobAccountName(azureStorageAccountName)
+ .blobAccessKey(azureStorageAccountKey)
+ .blobContainerName(azureBlobContainerName.get()))
+ .routeId("eventhubs-consumer-generated-connection-string")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId}")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId} ==
0"))
+ .to("mock:partition-0-generated-connection-string-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ from("direct:sendEventWithGeneratedConnectionString")
+ .to(azureEventhubs(eventHubsPath)
+
.sharedAccessName(connectionProperties.get("SharedAccessKey"))
+
.sharedAccessKey(connectionProperties.get("SharedAccessKeyValue")));
+
+ if (AzureCredentialsHelper.isAzureIdentityCredentialsAvailable()) {
+ // Consumes events from partition 4 using AZURE_IDENTITY
credential type
+ from(azureEventhubs(eventHubsPath)
+ .credentialType(CredentialType.AZURE_IDENTITY)
+ // TODO: Remove shared access config
+ // https://github.com/apache/camel-quarkus/issues/6368
+ .sharedAccessName("fake-name")
+ .sharedAccessKey("fake-key")
+ .blobAccountName(azureStorageAccountName)
+ .blobAccessKey(azureStorageAccountKey)
+ .blobContainerName(azureBlobContainerName.get()))
+
.routeId("eventhubs-consumer-azure-identity-credential")
+ .autoStartup(false)
+ .log("Consumed event payload ${body} from partition
${header.CamelAzureEventHubsPartitionId} with TokenCredential")
+ .choice()
+ .when(simple("${header.CamelAzureEventHubsPartitionId}
== 4"))
+ .to("mock:partition-4-results")
+ .otherwise()
+ .log("Message received from unexpected partition id
${header.CamelAzureEventHubsPartitionId}");
+
+ from("direct:sendEventUsingAzureIdentity")
+ .to(azureEventhubs(eventHubsPath)
+ // TODO: Remove shared access config
+ //
https://github.com/apache/camel-quarkus/issues/6368
+ .sharedAccessName("fake-name")
+ .sharedAccessKey("fake-key")
+
.credentialType(CredentialType.AZURE_IDENTITY));
+ }
+ }
+ }
}
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/InMemoryCheckpointStore.java
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/InMemoryCheckpointStore.java
new file mode 100644
index 0000000000..3033810539
--- /dev/null
+++
b/integration-test-groups/azure/azure-eventhubs/src/main/java/org/apache/camel/quarkus/component/azure/eventhubs/it/InMemoryCheckpointStore.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package org.apache.camel.quarkus.component.azure.eventhubs.it;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.azure.core.util.CoreUtils;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.CheckpointStore;
+import com.azure.messaging.eventhubs.models.Checkpoint;
+import com.azure.messaging.eventhubs.models.PartitionOwnership;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import static
com.azure.messaging.eventhubs.implementation.ClientConstants.OWNER_ID_KEY;
+import static
com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;
+import static
com.azure.messaging.eventhubs.implementation.ClientConstants.SEQUENCE_NUMBER_KEY;
+
+/**
+ * An in-memory checkpoint store. This is primarily to test custom event
positioning. Inspired by
+ *
https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java
+ */
+public class InMemoryCheckpointStore implements CheckpointStore {
+ private static final String OWNERSHIP = "ownership";
+ private static final String SEPARATOR = "/";
+ private static final String CHECKPOINT = "checkpoint";
+ private final Map<String, PartitionOwnership> partitionOwnershipMap = new
ConcurrentHashMap<>();
+ private final Map<String, Checkpoint> checkpointsMap = new
ConcurrentHashMap<>();
+ private static final ClientLogger LOGGER = new
ClientLogger(InMemoryCheckpointStore.class);
+
+ @Override
+ public Flux<PartitionOwnership> listOwnership(String
fullyQualifiedNamespace, String eventHubName,
+ String consumerGroup) {
+ LOGGER.info("Listing partition ownership");
+
+ String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName,
consumerGroup, OWNERSHIP);
+ return Flux.fromIterable(partitionOwnershipMap.keySet())
+ .filter(key -> key.startsWith(prefix))
+ .map(key -> partitionOwnershipMap.get(key));
+ }
+
+ private String prefixBuilder(String fullyQualifiedNamespace, String
eventHubName, String consumerGroup,
+ String type) {
+ return new StringBuilder()
+ .append(fullyQualifiedNamespace)
+ .append(SEPARATOR)
+ .append(eventHubName)
+ .append(SEPARATOR)
+ .append(consumerGroup)
+ .append(SEPARATOR)
+ .append(type)
+ .toString()
+ .toLowerCase(Locale.ROOT);
+ }
+
+ @Override
+ public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership>
requestedPartitionOwnerships) {
+ if (CoreUtils.isNullOrEmpty(requestedPartitionOwnerships)) {
+ return Flux.empty();
+ }
+ PartitionOwnership firstEntry = requestedPartitionOwnerships.get(0);
+ String prefix = prefixBuilder(firstEntry.getFullyQualifiedNamespace(),
firstEntry.getEventHubName(),
+ firstEntry.getConsumerGroup(), OWNERSHIP);
+
+ return Flux.fromIterable(requestedPartitionOwnerships)
+ .filter(ownershipRequest -> {
+ final String key = prefix + SEPARATOR +
ownershipRequest.getPartitionId();
+ final PartitionOwnership existing =
partitionOwnershipMap.get(key);
+
+ if (existing == null) {
+ return true;
+ }
+
+ return
existing.getETag().equals(ownershipRequest.getETag());
+ })
+ .doOnNext(partitionOwnership -> LOGGER.atInfo()
+ .addKeyValue(PARTITION_ID_KEY,
partitionOwnership.getPartitionId())
+ .addKeyValue(OWNER_ID_KEY,
partitionOwnership.getOwnerId())
+ .log("Ownership claimed"))
+ .map(partitionOwnership -> {
+ partitionOwnership.setETag(UUID.randomUUID().toString())
+ .setLastModifiedTime(System.currentTimeMillis());
+
+ final String key = prefix + SEPARATOR +
partitionOwnership.getPartitionId();
+ partitionOwnershipMap.put(key, partitionOwnership);
+
+ return partitionOwnership;
+ });
+ }
+
+ @Override
+ public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace,
String eventHubName, String consumerGroup) {
+ String prefix = prefixBuilder(fullyQualifiedNamespace, eventHubName,
consumerGroup, CHECKPOINT);
+ return Flux.fromIterable(checkpointsMap.keySet())
+ .filter(key -> key.startsWith(prefix))
+ .map(checkpointsMap::get);
+ }
+
+ @Override
+ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
+ if (checkpoint == null) {
+ return Mono.error(LOGGER.logExceptionAsError(new
NullPointerException("checkpoint cannot be null")));
+ }
+
+ String prefix = prefixBuilder(checkpoint.getFullyQualifiedNamespace(),
checkpoint.getEventHubName(),
+ checkpoint.getConsumerGroup(), CHECKPOINT);
+ checkpointsMap.put(prefix + SEPARATOR + checkpoint.getPartitionId(),
checkpoint);
+ LOGGER.atInfo()
+ .addKeyValue(PARTITION_ID_KEY, checkpoint.getPartitionId())
+ .addKeyValue(SEQUENCE_NUMBER_KEY,
checkpoint.getSequenceNumber())
+ .log("Updated checkpoint.");
+ return Mono.empty();
+ }
+}
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/main/resources/application.properties
b/integration-test-groups/azure/azure-eventhubs/src/main/resources/application.properties
new file mode 100644
index 0000000000..52792c953e
--- /dev/null
+++
b/integration-test-groups/azure/azure-eventhubs/src/main/resources/application.properties
@@ -0,0 +1,22 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# Disable autowiring of EventHubProducerAsyncClient as we want control of
which endpoints the custom client is used in
+camel.component.azure-eventhubs.autowired-enabled = false
+
+# Uncomment to reduce log noise from com.azure internals
+# quarkus.log.category."com.azure".level = OFF
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
index 8489a5b69e..ce91d90c03 100644
---
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
+++
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsIT.java
@@ -21,7 +21,7 @@ import
org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
@EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_NAME", matches =
".+")
@EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_KEY", matches =
".+")
-@EnabledIfEnvironmentVariable(named = "AZURE_BLOB_CONTAINER_NAME", matches =
".+")
+@EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_BLOB_CONTAINER_NAME",
matches = ".+")
@EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_CONNECTION_STRING",
matches = ".+")
@QuarkusIntegrationTest
class AzureEventhubsIT extends AzureEventhubsTest {
diff --git
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
index 5b7c8627ca..88449d6187 100644
---
a/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
+++
b/integration-test-groups/azure/azure-eventhubs/src/test/java/org/apache/camel/quarkus/component/azure/eventhubs/it/AzureEventhubsTest.java
@@ -16,45 +16,414 @@
*/
package org.apache.camel.quarkus.component.azure.eventhubs.it;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
-import org.apache.commons.lang3.RandomStringUtils;
+import io.restassured.response.Response;
import org.awaitility.Awaitility;
-import org.jboss.logging.Logger;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
@EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_NAME", matches =
".+")
@EnabledIfEnvironmentVariable(named = "AZURE_STORAGE_ACCOUNT_KEY", matches =
".+")
@EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_BLOB_CONTAINER_NAME",
matches = ".+")
@EnabledIfEnvironmentVariable(named = "AZURE_EVENT_HUBS_CONNECTION_STRING",
matches = ".+")
@QuarkusTest
class AzureEventhubsTest {
+ // NOTE: Consumer endpoints are started / stopped manually to prevent them
from inferring with each other
+
+ @Test
+ void produceConsumeEvents() {
+ try {
+ RestAssured.given()
+ .post("/azure-eventhubs/route/eventhubs-consumer/start")
+ .then()
+ .statusCode(204);
+
+ final String messageBody = UUID.randomUUID().toString();
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(messageBody)
+ .post("/azure-eventhubs/send-event/0")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri", "mock:partition-0-results")
+ .body(messageBody)
+ .get("/azure-eventhubs/receive-event")
+ .then()
+ .statusCode(200)
+ .body(
+ "body", is(messageBody),
+ "headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "headers.CamelAzureEventHubsPartitionId",
is("0"),
+ "headers.CamelAzureEventHubsSequenceNumber",
greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+ .post("/azure-eventhubs/route/eventhubs-consumer/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
+
+ @Test
+ void produceMultipleMessages() {
+ try {
+ RestAssured.given()
+ .post("/azure-eventhubs/route/eventhubs-consumer/start")
+ .then()
+ .statusCode(204);
+
+ List<String> messages = new ArrayList<>(3);
+ for (int i = 0; i < 3; i++) {
+ messages.add(UUID.randomUUID().toString());
+ }
+
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(messages)
+ .post("/azure-eventhubs/send-events/1")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .queryParam("endpointUri", "mock:partition-1-results")
+ .body(messages)
+ .get("/azure-eventhubs/receive-events")
+ .then()
+ .statusCode(200)
+ .body(
+ "size()", is(3),
+ "[0].body", is(messages.get(0)),
+ "[0].headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "[0].headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "[0].headers.CamelAzureEventHubsPartitionId",
is("1"),
+
"[0].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+ "[1].body", is(messages.get(1)),
+ "[1].headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "[1].headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "[1].headers.CamelAzureEventHubsPartitionId",
is("1"),
+
"[1].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+ "[2].body", is(messages.get(2)),
+ "[2].headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "[2].headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "[2].headers.CamelAzureEventHubsPartitionId",
is("1"),
+
"[2].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+ .post("/azure-eventhubs/route/eventhubs-consumer/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
+
+ @Test
+ void produceConsumeEventsWithCustomClient() {
+ try {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-for-custom-client/start")
+ .then()
+ .statusCode(204);
+
+ final String messageBody = UUID.randomUUID().toString();
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .queryParam("endpointUri",
"direct:sendEventUsingCustomClient")
+ .body(messageBody)
+ .post("/azure-eventhubs/send-event/0")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri",
"mock:partition-0-custom-client-results")
+ .body(messageBody)
+ .get("/azure-eventhubs/receive-event")
+ .then()
+ .statusCode(200)
+ .body(
+ "body", is(messageBody),
+ "headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "headers.CamelAzureEventHubsPartitionId",
is("0"),
+ "headers.CamelAzureEventHubsSequenceNumber",
greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-for-custom-client/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
+
+ @Test
+ void customEventPosition() {
+ try {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-custom-checkpoint-store/start")
+ .then()
+ .statusCode(204);
+
+ // Send some messages to partition 2
+ List<String> messages = new ArrayList<>(3);
+ for (int i = 0; i < 3; i++) {
+ messages.add(UUID.randomUUID().toString());
+ }
+
+ RestAssured.given()
+ .contentType(ContentType.JSON)
+ .body(messages)
+ .post("/azure-eventhubs/send-events/2")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri",
"mock:partition-2-initial-results")
+ .contentType(ContentType.JSON)
+ .body(messages)
+ .get("/azure-eventhubs/receive-events")
+ .then()
+ .statusCode(200)
+ .body(
+ "size()", is(3),
+ "[0].body", is(messages.get(0)),
+ "[0].headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "[0].headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "[0].headers.CamelAzureEventHubsPartitionId",
is("2"),
+
"[0].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+ "[1].body", is(messages.get(1)),
+ "[1].headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "[1].headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "[1].headers.CamelAzureEventHubsPartitionId",
is("2"),
+
"[1].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0),
+ "[2].body", is(messages.get(2)),
+ "[2].headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "[2].headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "[2].headers.CamelAzureEventHubsPartitionId",
is("2"),
+
"[2].headers.CamelAzureEventHubsSequenceNumber", greaterThanOrEqualTo(0));
+ });
+
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-custom-checkpoint-store/stop")
+ .then()
+ .statusCode(204);
+
+ // Start another consumer configured to read partition 2 from the
earliest offset
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-with-event-position/start")
+ .then()
+ .statusCode(204);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ Response response = RestAssured.given()
+ .queryParam("endpointUri",
"mock:partition-2-event-position-results")
+ .contentType(ContentType.JSON)
+ .body(messages)
+ .get("/azure-eventhubs/receive-events")
+ .then()
+ .statusCode(200)
+ .extract()
+ .response();
+
+ // Based on the data retention period configured on the
EventHub, we can't make assumptions about what data is in the partition
+ // Therefore, we assume the last 3 events will be the ones
produced earlier in the test
+ List<Map<String, Object>> results =
response.jsonPath().getList("$.");
+ int size = results.size();
+ assertTrue(size >= 3);
+ assertEquals(messages.get(0), results.get(size -
3).get("body"));
+ assertEquals(messages.get(1), results.get(size -
2).get("body"));
+ assertEquals(messages.get(2), results.get(size -
1).get("body"));
+ });
+ } finally {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-with-event-position/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
+
+ @Test
+ void tokenCredentials() {
+ try {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-custom-token-credential/start")
+ .then()
+ .statusCode(204);
+
+ final String messageBody = UUID.randomUUID().toString();
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .queryParam("endpointUri",
"direct:sendEventUsingTokenCredential")
+ .body(messageBody)
+ .post("/azure-eventhubs/send-event/3")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri", "mock:partition-3-results")
+ .body(messageBody)
+ .get("/azure-eventhubs/receive-event")
+ .then()
+ .statusCode(200)
+ .body(
+ "body", is(messageBody),
+ "headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "headers.CamelAzureEventHubsPartitionId",
is("3"),
+ "headers.CamelAzureEventHubsSequenceNumber",
greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-custom-token-credential/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
+
+ @Test
+ void azureIdentityCredentials() {
+
Assumptions.assumeTrue(AzureCredentialsHelper.isAzureIdentityCredentialsAvailable());
- private static final Logger LOG =
Logger.getLogger(AzureEventhubsTest.class);
+ try {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-azure-identity-credential/start")
+ .then()
+ .statusCode(204);
+
+ final String messageBody = UUID.randomUUID().toString();
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .queryParam("endpointUri",
"direct:sendEventUsingAzureIdentity")
+ .body(messageBody)
+ .post("/azure-eventhubs/send-event/4")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri", "mock:partition-4-results")
+ .body(messageBody)
+ .get("/azure-eventhubs/receive-event")
+ .then()
+ .statusCode(200)
+ .body(
+ "body", is(messageBody),
+ "headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "headers.CamelAzureEventHubsPartitionId",
is("4"),
+ "headers.CamelAzureEventHubsSequenceNumber",
greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-azure-identity-credential/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
+
+ @Test
+ void amqpWebSocketsTransport() {
+ try {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-with-amqp-ws-transport/start")
+ .then()
+ .statusCode(204);
+
+ final String messageBody = UUID.randomUUID().toString();
+
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .queryParam("endpointUri",
"direct:sendEventUsingAmqpWebSockets")
+ .body(messageBody)
+ .post("/azure-eventhubs/send-event/4")
+ .then()
+ .statusCode(201);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri",
"mock:partition-4-ws-transport-results")
+ .body(messageBody)
+ .get("/azure-eventhubs/receive-event")
+ .then()
+ .statusCode(200)
+ .body(
+ "body", is(messageBody),
+ "headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "headers.CamelAzureEventHubsPartitionId",
is("4"),
+ "headers.CamelAzureEventHubsSequenceNumber",
greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-with-amqp-ws-transport/stop")
+ .then()
+ .statusCode(204);
+ }
+ }
@Test
- public void roundTrip() {
- final String messageBody = RandomStringUtils.randomAlphabetic(30);
+ void generatedConnectionString() {
+ try {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-generated-connection-string/start")
+ .then()
+ .statusCode(204);
- RestAssured.given()
- .contentType(ContentType.TEXT)
- .body(messageBody)
- .post("/azure-eventhubs/send-events")
- .then()
- .statusCode(201);
+ final String messageBody = UUID.randomUUID().toString();
- Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120,
TimeUnit.SECONDS).until(() -> {
- final String body = RestAssured.given()
- .get("/azure-eventhubs/receive-events")
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .queryParam("endpointUri",
"direct:sendEventWithGeneratedConnectionString")
+ .body(messageBody)
+ .post("/azure-eventhubs/send-event/0")
.then()
- .extract().body().asString();
- LOG.infof("Expected message body: '%s'; actual: '%s'",
messageBody, body);
- return body != null && body.contains(messageBody);
- });
+ .statusCode(201);
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(1,
TimeUnit.MINUTES).untilAsserted(() -> {
+ RestAssured.given()
+ .queryParam("endpointUri",
"mock:partition-0-generated-connection-string-results")
+ .body(messageBody)
+ .get("/azure-eventhubs/receive-event")
+ .then()
+ .statusCode(200)
+ .body(
+ "body", is(messageBody),
+ "headers.CamelAzureEventHubsEnqueuedTime",
notNullValue(),
+ "headers.CamelAzureEventHubsOffset",
greaterThanOrEqualTo(0),
+ "headers.CamelAzureEventHubsPartitionId",
is("0"),
+ "headers.CamelAzureEventHubsSequenceNumber",
greaterThanOrEqualTo(0));
+ });
+ } finally {
+ RestAssured.given()
+
.post("/azure-eventhubs/route/eventhubs-consumer-generated-connection-string/stop")
+ .then()
+ .statusCode(204);
+ }
}
}
diff --git a/integration-test-groups/azure/azure-resources.sh
b/integration-test-groups/azure/azure-resources.sh
index 243d541662..f3a184be2e 100755
--- a/integration-test-groups/azure/azure-resources.sh
+++ b/integration-test-groups/azure/azure-resources.sh
@@ -74,7 +74,7 @@ function createResources() {
az storage container create --account-name ${AZURE_STORAGE_ACCOUNT_NAME}
--name ${AZURE_BLOB_CONTAINER_NAME} --auth-mode login
az eventhubs namespace create --name ${EH_NAMESPACE} --resource-group
${RESOURCE_GROUP} --location ${ZONE}
- az eventhubs eventhub create --name ${EH_NAME} --resource-group
${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} --partition-count 1
+ az eventhubs eventhub create --name ${EH_NAME} --resource-group
${RESOURCE_GROUP} --namespace-name ${EH_NAMESPACE} --partition-count 5
AZURE_EVENT_HUBS_CONNECTION_STRING=$(az eventhubs namespace
authorization-rule keys list --resource-group ${RESOURCE_GROUP}
--namespace-name ${EH_NAMESPACE} --name RootManageSharedAccessKey --query
primaryConnectionString -o tsv)