This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 9ad75d9ead Add SQS batch consumer, JMS-like selector, KMS encryption
and Quarkus client tests
9ad75d9ead is described below
commit 9ad75d9ead3ac1829fac833ecd4284c4875f72e3
Author: JinyuChen97 <[email protected]>
AuthorDate: Mon Mar 23 13:53:51 2026 +0000
Add SQS batch consumer, JMS-like selector, KMS encryption and Quarkus
client tests
Fixes #2777
---
integration-test-groups/aws2/aws2-sqs/pom.xml | 5 +
.../component/aws2/sqs/it/Aws2SqsResource.java | 82 ++++++++++++++++
.../aws2/sqs/it/BatchConsumerMessageCollector.java | 40 ++++++++
.../aws2/sqs/it/BatchConsumerRouteBuilder.java | 38 ++++++++
.../aws2/sqs/it/SelectorMessageCollector.java | 40 ++++++++
.../aws2/sqs/it/SelectorRouteBuilder.java | 57 +++++++++++
.../quarkus/component/aws2/sqs/it/Aws2SqsTest.java | 104 +++++++++++++++++++++
.../aws2/sqs/it/Aws2SqsTestEnvCustomizer.java | 37 ++++++++
8 files changed, 403 insertions(+)
diff --git a/integration-test-groups/aws2/aws2-sqs/pom.xml
b/integration-test-groups/aws2/aws2-sqs/pom.xml
index 9d0af4b254..c443d66294 100644
--- a/integration-test-groups/aws2/aws2-sqs/pom.xml
+++ b/integration-test-groups/aws2/aws2-sqs/pom.xml
@@ -70,6 +70,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kms</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
index 6bd106b8b9..f4cb6cb2cb 100644
---
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
+++
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Optional;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
@@ -30,6 +31,7 @@ import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.camel.ConsumerTemplate;
@@ -47,12 +49,24 @@ public class Aws2SqsResource extends BaseAws2Resource {
@ConfigProperty(name = "aws-sqs.queue-name")
String queueName;
+ @ConfigProperty(name = "aws-sqs.batch-consumer-name")
+ String batchConsumerQueueName;
+
+ @ConfigProperty(name = "aws-sqs.kms-key-id")
+ Optional<String> kmsKeyId;
+
@Inject
ProducerTemplate producerTemplate;
@Inject
ConsumerTemplate consumerTemplate;
+ @Inject
+ BatchConsumerMessageCollector batchConsumerMessageCollector;
+
+ @Inject
+ SelectorMessageCollector selectorMessageCollector;
+
public Aws2SqsResource() {
super("sqs");
}
@@ -169,6 +183,74 @@ public class Aws2SqsResource extends BaseAws2Resource {
String.class);
}
+ @Path("batch-consumer/send")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ public Response sendToBatchConsumerQueue(String message) {
+ producerTemplate.sendBody(componentUri(batchConsumerQueueName),
message);
+ return Response.ok().build();
+ }
+
+ @Path("batch-consumer/messages")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<String> getBatchConsumerMessages() {
+ return batchConsumerMessageCollector.getMessages();
+ }
+
+ @Path("batch-consumer/messages")
+ @DELETE
+ public Response clearBatchConsumerMessages() {
+ batchConsumerMessageCollector.clear();
+ return Response.ok().build();
+ }
+
+ @Path("kms/send/{queueName}")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ public Response sendToKmsQueue(@PathParam("queueName") String
kmsQueueName, String message) {
+ String kmsId = kmsKeyId.orElseThrow(() -> new
IllegalStateException("aws-sqs.kms-key-id not configured"));
+ String uri =
String.format("aws2-sqs://%s?autoCreateQueue=true&serverSideEncryptionEnabled=true&kmsMasterKeyId=%s",
+ kmsQueueName, kmsId);
+ producerTemplate.sendBody(uri, message);
+ return Response.ok().build();
+ }
+
+ @Path("kms/receive/{queueName}")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String receiveFromKmsQueue(@PathParam("queueName") String
kmsQueueName) {
+ String kmsId = kmsKeyId.orElseThrow(() -> new
IllegalStateException("aws-sqs.kms-key-id not configured"));
+ String uri = String.format(
+
"aws2-sqs://%s?autoCreateQueue=true&serverSideEncryptionEnabled=true&kmsMasterKeyId=%s&deleteAfterRead=true",
+ kmsQueueName, kmsId);
+ return consumerTemplate.receiveBody(uri, 10000, String.class);
+ }
+
+ @Path("selector/send/{queueName}")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ public Response sendWithSelectorAttribute(@PathParam("queueName") String
targetQueueName,
+ @QueryParam("filterType") String filterType, String message) {
+ producerTemplate.sendBodyAndHeader(componentUri(targetQueueName),
message,
+ SelectorRouteBuilder.FILTER_ATTRIBUTE_NAME, filterType);
+ return Response.ok().build();
+ }
+
+ @Path("selector/messages")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public List<String> getSelectorMessages() {
+ return selectorMessageCollector.getMessages();
+ }
+
+ @Path("selector/messages")
+ @DELETE
+ public Response clearSelectorMessages() {
+ selectorMessageCollector.clear();
+ return Response.ok().build();
+ }
+
private String componentUri() {
return componentUri(queueName);
}
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerMessageCollector.java
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerMessageCollector.java
new file mode 100644
index 0000000000..947ed6ffe5
--- /dev/null
+++
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerMessageCollector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class BatchConsumerMessageCollector {
+
+ private final List<String> messages = new CopyOnWriteArrayList<>();
+
+ public void collect(String message) {
+ messages.add(message);
+ }
+
+ public List<String> getMessages() {
+ return List.copyOf(messages);
+ }
+
+ public void clear() {
+ messages.clear();
+ }
+}
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerRouteBuilder.java
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerRouteBuilder.java
new file mode 100644
index 0000000000..9c4803aa1e
--- /dev/null
+++
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerRouteBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.camel.builder.RouteBuilder;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@ApplicationScoped
+public class BatchConsumerRouteBuilder extends RouteBuilder {
+
+ @ConfigProperty(name = "aws-sqs.batch-consumer-name")
+ String batchConsumerQueueName;
+
+ @Inject
+ BatchConsumerMessageCollector collector;
+
+ @Override
+ public void configure() {
+ from("aws2-sqs://" + batchConsumerQueueName +
"?maxMessagesPerPoll=5&deleteAfterRead=true")
+ .process(exchange ->
collector.collect(exchange.getIn().getBody(String.class)));
+ }
+}
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorMessageCollector.java
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorMessageCollector.java
new file mode 100644
index 0000000000..a691777337
--- /dev/null
+++
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorMessageCollector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class SelectorMessageCollector {
+
+ private final List<String> messages = new CopyOnWriteArrayList<>();
+
+ public void collect(String message) {
+ messages.add(message);
+ }
+
+ public List<String> getMessages() {
+ return List.copyOf(messages);
+ }
+
+ public void clear() {
+ messages.clear();
+ }
+}
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorRouteBuilder.java
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorRouteBuilder.java
new file mode 100644
index 0000000000..04f2597754
--- /dev/null
+++
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorRouteBuilder.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import java.util.Map;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.sqs.Sqs2Constants;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+@ApplicationScoped
+public class SelectorRouteBuilder extends RouteBuilder {
+
+ static final String FILTER_ATTRIBUTE_NAME = "filter-type";
+ static final String FILTER_ATTRIBUTE_SELECTED_VALUE = "selected";
+
+ @ConfigProperty(name = "aws-sqs.selector-name")
+ String selectorQueueName;
+
+ @Inject
+ SelectorMessageCollector collector;
+
+ @Override
+ public void configure() {
+ from("aws2-sqs://" + selectorQueueName
+ +
"?messageAttributeNames=All&deleteAfterRead=true&deleteIfFiltered=false&defaultVisibilityTimeout=0")
+ .filter(exchange -> {
+ Map<?, ?> attrs =
exchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, Map.class);
+ if (attrs == null) {
+ return false;
+ }
+ Object attrObj = attrs.get(FILTER_ATTRIBUTE_NAME);
+ if (!(attrObj instanceof MessageAttributeValue attrValue))
{
+ return false;
+ }
+ return
FILTER_ATTRIBUTE_SELECTED_VALUE.equals(attrValue.stringValue());
+ })
+ .process(exchange ->
collector.collect(exchange.getIn().getBody(String.class)));
+ }
+}
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
index 004d8fc9e8..0a52cae001 100644
---
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
+++
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
@@ -23,6 +23,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -36,11 +37,13 @@ import io.restassured.response.Response;
import org.apache.camel.quarkus.test.support.aws2.Aws2LocalStack;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
import org.apache.camel.quarkus.test.support.aws2.BaseAWs2TestSupport;
+import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import static org.hamcrest.Matchers.anyOf;
@@ -260,6 +263,107 @@ class Aws2SqsTest extends BaseAWs2TestSupport {
.asString());
}
+ @Test
+ void sqsBatchConsumer() {
+ // clean previously collected messages
+
RestAssured.delete("/aws2-sqs/batch-consumer/messages").then().statusCode(200);
+
+ final List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ String msg = "batch-consumer-" +
UUID.randomUUID().toString().replace("-", "");
+ messages.add(msg);
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(msg)
+ .post("/aws2-sqs/batch-consumer/send")
+ .then()
+ .statusCode(200);
+ }
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120,
TimeUnit.SECONDS).until(() -> {
+ List<?> received =
RestAssured.get("/aws2-sqs/batch-consumer/messages")
+ .then().statusCode(200).extract().body().as(List.class);
+ return received.size() >= messages.size();
+ });
+
+ List<?> received = RestAssured.get("/aws2-sqs/batch-consumer/messages")
+ .then().statusCode(200).extract().body().as(List.class);
+ Assertions.assertEquals(messages.size(), received.size());
+ Assertions.assertTrue(received.containsAll(messages));
+ }
+
+ @Test
+ void sqsKmsEncryption() {
+ Assumptions.assumeTrue(localStack, "KMS test only runs on LocalStack");
+
+ final String kmsQueueName = "camel-quarkus-kms-"
+ +
RandomStringUtils.secure().nextAlphanumeric(10).toLowerCase(Locale.ROOT);
+ final String msg = "kms-msg-" +
UUID.randomUUID().toString().replace("-", "");
+
+ try {
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(msg)
+ .post("/aws2-sqs/kms/send/" + kmsQueueName)
+ .then()
+ .statusCode(200);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60,
TimeUnit.SECONDS).until(() -> {
+ ExtractableResponse<Response> resp =
RestAssured.get("/aws2-sqs/kms/receive/" + kmsQueueName)
+ .then().extract();
+ return resp.statusCode() == 200 &&
msg.equals(resp.body().asString());
+ });
+ } finally {
+ deleteQueue(kmsQueueName);
+ }
+ }
+
+ @Test
+ void sqsJmsLikeSelector() {
+ final String selectorQueueName =
ConfigProvider.getConfig().getValue("aws-sqs.selector-name", String.class);
+
+ // clean previously collected messages
+
RestAssured.delete("/aws2-sqs/selector/messages").then().statusCode(200);
+ purgeQueue(selectorQueueName);
+
+ final String selectedMsg = "selected-" +
UUID.randomUUID().toString().replace("-", "");
+ final String rejectedMsg = "rejected-" +
UUID.randomUUID().toString().replace("-", "");
+
+ // send message that matches the filter (filter-type=selected)
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(selectedMsg)
+ .queryParam("filterType",
SelectorRouteBuilder.FILTER_ATTRIBUTE_SELECTED_VALUE)
+ .post("/aws2-sqs/selector/send/" + selectorQueueName)
+ .then()
+ .statusCode(200);
+
+ // send message that does not match the filter (filter-type=rejected)
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(rejectedMsg)
+ .queryParam("filterType", "rejected")
+ .post("/aws2-sqs/selector/send/" + selectorQueueName)
+ .then()
+ .statusCode(200);
+
+ // wait for the selected message to be consumed
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60,
TimeUnit.SECONDS).until(() -> {
+ List<?> collected = RestAssured.get("/aws2-sqs/selector/messages")
+ .then().statusCode(200).extract().body().as(List.class);
+ return collected.contains(selectedMsg);
+ });
+
+ // verify only the selected message was collected
+ List<?> collected = RestAssured.get("/aws2-sqs/selector/messages")
+ .then().statusCode(200).extract().body().as(List.class);
+ Assertions.assertTrue(collected.contains(selectedMsg), "Selected
message should have been collected");
+ Assertions.assertFalse(collected.contains(rejectedMsg), "Rejected
message should not have been collected");
+
+ // purge rejected messages remaining in queue
+ purgeQueue(selectorQueueName);
+ }
+
@Override
public void testMethodForDefaultCredentialsProvider() {
listQueues();
diff --git
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
index 48e92beae2..f4aa714a06 100644
---
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
+++
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
@@ -22,6 +22,8 @@ import
org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer;
import org.apache.camel.quarkus.test.support.aws2.Service;
import org.apache.commons.lang3.RandomStringUtils;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.model.CreateKeyRequest;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
@@ -32,6 +34,11 @@ public class Aws2SqsTestEnvCustomizer implements
Aws2TestEnvCustomizer {
@Override
public Service[] localstackServices() {
+ return new Service[] { Service.SQS, Service.KMS };
+ }
+
+ @Override
+ public Service[] exportCredentialsForLocalstackServices() {
return new Service[] { Service.SQS };
}
@@ -50,6 +57,14 @@ public class Aws2SqsTestEnvCustomizer implements
Aws2TestEnvCustomizer {
+
RandomStringUtils.secure().nextAlphanumeric(49).toLowerCase(Locale.ROOT);
envContext.property("aws-sqs.delayed-name", delayedQueueName);
+ final String batchConsumerQueueName = "camel-quarkus-batch-"
+ +
RandomStringUtils.secure().nextAlphanumeric(49).toLowerCase(Locale.ROOT);
+ envContext.property("aws-sqs.batch-consumer-name",
batchConsumerQueueName);
+
+ final String selectorQueueName = "camel-quarkus-selector-"
+ +
RandomStringUtils.secure().nextAlphanumeric(49).toLowerCase(Locale.ROOT);
+ envContext.property("aws-sqs.selector-name", selectorQueueName);
+
final SqsClient sqsClient = envContext.client(Service.SQS,
SqsClient::builder);
{
final String queueUrl = sqsClient.createQueue(
@@ -70,10 +85,32 @@ public class Aws2SqsTestEnvCustomizer implements
Aws2TestEnvCustomizer {
.build())
.queueUrl();
+ final String batchConsumerUrl = sqsClient.createQueue(
+ CreateQueueRequest.builder()
+ .queueName(batchConsumerQueueName)
+ .build())
+ .queueUrl();
+
+ final String selectorUrl = sqsClient.createQueue(
+ CreateQueueRequest.builder()
+ .queueName(selectorQueueName)
+ .build())
+ .queueUrl();
+
+ if (envContext.isLocalStack()) {
+ final KmsClient kmsClient = envContext.client(Service.KMS,
KmsClient::builder);
+ final String kmsKeyId = kmsClient
+
.createKey(CreateKeyRequest.builder().description("camel-quarkus-sqs-test").build())
+ .keyMetadata().keyId();
+ envContext.property("aws-sqs.kms-key-id", kmsKeyId);
+ }
+
envContext.closeable(() -> {
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(failingUrl).build());
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(deadletterUrl).build());
+
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(batchConsumerUrl).build());
+
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(selectorUrl).build());
try {
String url =
sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(delayedQueueName).build())