This is an automated email from the ASF dual-hosted git repository.

aldettinger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 01276cdc8d392d82d373b820daae0bddbcd148e9
Author: Vratislav Hais <[email protected]>
AuthorDate: Tue Sep 7 10:08:35 2021 +0200

    [#2777] Increase test coverage of aws2sqs component
---
 .../component/aws2/sqs/it/Aws2SqsSnsResource.java  | 104 +++++++++++++-
 .../component/aws2/sqs/it/Aws2SqsSnsTest.java      | 150 +++++++++++++++++++--
 2 files changed, 243 insertions(+), 11 deletions(-)

diff --git 
a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
 
b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
index 003528a..1a48c60 100644
--- 
a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
+++ 
b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
@@ -22,10 +22,12 @@ import java.util.List;
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
@@ -33,8 +35,10 @@ import javax.ws.rs.core.Response;
 
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.aws2.sqs.Sqs2Constants;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
 
 @Path("/aws2-sqs-sns")
 @ApplicationScoped
@@ -79,11 +83,48 @@ public class Aws2SqsSnsResource {
                 .build();
     }
 
-    @Path("/sqs/receive")
+    @Path("/sqs/send/{queueName}")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response sqsSendSpecificQueue(@PathParam("queueName") String 
queueName, String message) throws Exception {
+        final String response = 
producerTemplate.requestBody(componentUri(queueName), message, String.class);
+        return Response
+                .created(new URI("https://camel.apache.org/";))
+                .entity(response)
+                .build();
+    }
+
+    @Path("/sqs/purge/queue/{queueName}")
+    @DELETE
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response purgeQueue(@PathParam("queueName") String queueName) 
throws Exception {
+        producerTemplate.sendBodyAndHeader(componentUri(queueName) + 
"?operation=purgeQueue",
+                null,
+                Sqs2Constants.SQS_QUEUE_PREFIX,
+                queueName);
+        return Response.ok().build();
+    }
+
+    @Path("/sqs/receive/{queueName}/{deleteMessage}")
     @GET
     @Produces(MediaType.TEXT_PLAIN)
-    public String sqsReceive() throws Exception {
-        return consumerTemplate.receiveBody(componentUri(), 10000, 
String.class);
+    public String sqsReceive(@PathParam("queueName") String queueName, 
@PathParam("deleteMessage") String deleteMessage)
+            throws Exception {
+        return consumerTemplate.receiveBody(componentUri(queueName)
+                + "?deleteAfterRead=" + deleteMessage + "&deleteIfFiltered=" + 
deleteMessage + "&defaultVisibilityTimeout=1",
+                10000,
+                String.class);
+    }
+
+    @Path("/sqs/receive/receipt/{queueName}")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public String sqsReceipt(@PathParam("queueName") String queueName) throws 
Exception {
+        return consumerTemplate.receive(componentUri(queueName), 10000)
+                .getIn()
+                .getHeader(Sqs2Constants.RECEIPT_HANDLE)
+                .toString();
     }
 
     @Path("/sqs/queues")
@@ -94,10 +135,67 @@ public class Aws2SqsSnsResource {
                 .queueUrls();
     }
 
+    @Path("/sqs/batch")
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response sendBatchMessage(List<String> messages) throws Exception {
+        final SendMessageBatchResponse response = producerTemplate.requestBody(
+                componentUri() + "?operation=sendBatchMessage",
+                messages,
+                SendMessageBatchResponse.class);
+        return Response
+                .created(new URI("https://camel.apache.org/";))
+                .entity("" + response.successful().size())
+                .build();
+    }
+
+    @Path("/sqs/delete/message/{queueName}/{receipt}")
+    @DELETE
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response deleteMessage(@PathParam("queueName") String queueName, 
@PathParam("receipt") String receipt)
+            throws Exception {
+        producerTemplate.sendBodyAndHeader(componentUri(queueName) + 
"?operation=deleteMessage",
+                null,
+                Sqs2Constants.RECEIPT_HANDLE,
+                receipt);
+        return Response.ok().build();
+    }
+
+    @Path("/sqs/delete/queue/{queueName}")
+    @DELETE
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response deleteQueue(@PathParam("queueName") String queueName) 
throws Exception {
+        producerTemplate.sendBodyAndHeader(componentUri(queueName) + 
"?operation=deleteQueue",
+                null,
+                Sqs2Constants.SQS_QUEUE_PREFIX,
+                queueName);
+        return Response.ok().build();
+    }
+
+    @Path("/sqs/queue/autocreate/delayed/{queueName}/{delay}")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<String> autoCreateDelayedQueue(@PathParam("queueName") String 
queueName, @PathParam("delay") String delay)
+            throws Exception {
+        // queue creation without any operation resulted in 405 status code
+        return producerTemplate
+                .requestBody(
+                        "aws2-sqs://" + queueName
+                                + 
"?autoCreateQueue=true&delayQueue=true&delaySeconds=" + delay + 
"&operation=listQueues",
+                        null,
+                        ListQueuesResponse.class)
+                .queueUrls();
+    }
+
     private String componentUri() {
         return "aws2-sqs://" + queueName;
     }
 
+    private String componentUri(String queueName) {
+        return "aws2-sqs://" + queueName;
+    }
+
     @Path("/sns/send")
     @POST
     @Consumes(MediaType.TEXT_PLAIN)
diff --git 
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
 
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
index 76b39a6..c68c2ad 100644
--- 
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
+++ 
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.quarkus.component.aws2.sqs.it;
 
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -26,12 +31,12 @@ import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
 import org.awaitility.Awaitility;
-import org.eclipse.microprofile.config.Config;
 import org.eclipse.microprofile.config.ConfigProvider;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.core.Is.is;
 
 @QuarkusTest
@@ -40,28 +45,157 @@ class Aws2SqsSnsTest {
 
     @Test
     public void sqs() {
-        final Config config = ConfigProvider.getConfig();
-        final String queueName = config.getValue("aws-sqs.queue-name", 
String.class);
+        final String queueName = getPredefinedQueueName();
 
-        String[] queues = RestAssured.get("/aws2-sqs-sns/sqs/queues")
+        final String[] queues = listQueues();
+        Assertions.assertTrue(Stream.of(queues).anyMatch(url -> 
url.contains(queueName)));
+
+        final String msg = sendSingleMessageToQueue(queueName);
+        awaitMessageFromQueue(msg, queueName);
+    }
+
+    private String[] listQueues() {
+        return RestAssured.get("/aws2-sqs-sns/sqs/queues")
                 .then()
                 .statusCode(200)
                 .extract()
                 .body().as(String[].class);
-        Assertions.assertTrue(Stream.of(queues).anyMatch(url -> 
url.contains(queueName)));
+    }
+
+    @Test
+    public void sqsDeleteMessage() {
+        final String qName = getPredefinedQueueName();
+        final String msg = sendSingleMessageToQueue(qName);
+        final String receipt = receiveReceiptOfMessageFromQueue(qName);
+        deleteMessageFromQueue(qName, receipt);
+        Assertions.assertNotEquals(receiveMessageFromQueue(qName), msg);
+    }
+
+    private String getPredefinedQueueName() {
+        return ConfigProvider.getConfig().getValue("aws-sqs.queue-name", 
String.class);
+    }
 
+    private String sendSingleMessageToQueue(String queueName) {
         final String msg = "sqs" + UUID.randomUUID().toString().replace("-", 
"");
         RestAssured.given()
                 .contentType(ContentType.TEXT)
                 .body(msg)
-                .post("/aws2-sqs-sns/sqs/send")
+                .post("/aws2-sqs-sns/sqs/send/" + queueName)
                 .then()
                 .statusCode(201);
+        return msg;
+    }
+
+    private String receiveReceiptOfMessageFromQueue(String queueName) {
+        return RestAssured.get("/aws2-sqs-sns/sqs/receive/receipt/" + 
queueName)
+                .then()
+                .statusCode(200)
+                .extract()
+                .body()
+                .asString();
+    }
+
+    private void deleteMessageFromQueue(String queueName, String receipt) {
+        RestAssured.delete("/aws2-sqs-sns/sqs/delete/message/" + queueName + 
"/" + receipt)
+                .then()
+                .statusCode(200);
+    }
+
+    @Test
+    public void sqsAutoCreateDelayedQueue() {
+        final String qName = "delayQueue";
+        final int delay = 10;
+        createDelayQueueAndVerifyExistence("delayQueue", 10);
+        final String msgSent = sendSingleMessageToQueue(qName);
+        Instant start = Instant.now();
+        awaitMessageFromQueue(msgSent, qName);
+        Assertions.assertTrue(Duration.between(start, 
Instant.now()).getSeconds() >= delay);
+        deleteQueue(qName);
+    }
 
+    private void createDelayQueueAndVerifyExistence(String queueName, int 
delay) {
+        final String[] queues = 
RestAssured.get("/aws2-sqs-sns/sqs/queue/autocreate/delayed/" + queueName + "/" 
+ delay)
+                .then()
+                .statusCode(200)
+                .extract()
+                .body()
+                .as(String[].class);
+        Assertions.assertTrue(Stream.of(queues).anyMatch(url -> 
url.contains(queueName)));
+    }
+
+    private void awaitMessageFromQueue(String expectedContent, String 
queueName) {
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, 
TimeUnit.SECONDS).until(
+                () -> receiveMessageFromQueue(queueName),
+                Matchers.is(expectedContent));
+    }
+
+    private void deleteQueue(String queueName) {
+        RestAssured.get("/aws2-sqs-sns/sqs/delete/queue/" + queueName)
+                .then()
+                .statusCode(200);
+        awaitQueueDeleted(queueName);
+    }
+
+    private void awaitQueueDeleted(String queueName) {
         Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, 
TimeUnit.SECONDS).until(
-                () -> 
RestAssured.get("/aws2-sqs-sns/sqs/receive").then().statusCode(200).extract().body().asString(),
-                Matchers.is(msg));
+                () -> 
Stream.of(listQueues()).peek(System.out::println).noneMatch(url -> 
url.contains(queueName)));
+    }
+
+    private String receiveMessageFromQueue(String queueName) {
+        return receiveMessageFromQueue(queueName, true);
+    }
+
+    private String receiveMessageFromQueue(String queueName, boolean 
deleteMessage) {
+        return RestAssured.get("/aws2-sqs-sns/sqs/receive/" + queueName + "/" 
+ deleteMessage)
+                .then()
+                .statusCode(anyOf(is(200), is(204)))
+                .extract()
+                .body()
+                .asString();
+    }
+
+    @Test
+    public void sqsSendBatchMessage() {
+        final List<String> messages = new ArrayList<>(Arrays.asList(
+                "Hello from camel-quarkus",
+                "This is a batch message test",
+                "Let's add few more messages",
+                "Next message will be last",
+                "Goodbye from camel-quarkus"));
+        Assertions.assertEquals(messages.size(), 
sendMessageBatchAndRetrieveSuccessCount(messages));
+    }
+
+    private int sendMessageBatchAndRetrieveSuccessCount(List<String> batch) {
+        return Integer.parseInt(RestAssured.given()
+                .contentType(ContentType.JSON)
+                .body(batch)
+                .post("/aws2-sqs-sns/sqs/batch")
+                .then()
+                .statusCode(201)
+                .extract()
+                .body()
+                .asString());
+    }
+
+    @Test
+    public void sqsPurgeQueue() {
+        final String qName = getPredefinedQueueName();
+        sendSingleMessageToQueue(qName);
+        purgeQueue(qName);
+        awaitAllMessagesDeletedFromQueue(qName);
+    }
+
+    private void purgeQueue(String queueName) {
+        RestAssured.delete("/aws2-sqs-sns/sqs/purge/queue/" + queueName)
+                .then()
+                .statusCode(200);
+    }
 
+    private void awaitAllMessagesDeletedFromQueue(String queueName) {
+        // it can take up to 60 seconds to purge all messages in queue as 
stated in documentation
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60, 
TimeUnit.SECONDS).until(
+                () -> receiveMessageFromQueue(queueName, false),
+                Matchers.emptyOrNullString());
     }
 
     @Test

Reply via email to