This is an automated email from the ASF dual-hosted git repository. aldettinger pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 01276cdc8d392d82d373b820daae0bddbcd148e9 Author: Vratislav Hais <[email protected]> AuthorDate: Tue Sep 7 10:08:35 2021 +0200 [#2777] Increase test coverage of aws2sqs component --- .../component/aws2/sqs/it/Aws2SqsSnsResource.java | 104 +++++++++++++- .../component/aws2/sqs/it/Aws2SqsSnsTest.java | 150 +++++++++++++++++++-- 2 files changed, 243 insertions(+), 11 deletions(-) diff --git a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java index 003528a..1a48c60 100644 --- a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java +++ b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java @@ -22,10 +22,12 @@ import java.util.List; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; @@ -33,8 +35,10 @@ import javax.ws.rs.core.Response; import org.apache.camel.ConsumerTemplate; import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.aws2.sqs.Sqs2Constants; import org.eclipse.microprofile.config.inject.ConfigProperty; import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; @Path("/aws2-sqs-sns") @ApplicationScoped @@ -79,11 +83,48 @@ public class Aws2SqsSnsResource { .build(); } - @Path("/sqs/receive") + @Path("/sqs/send/{queueName}") + @POST + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.TEXT_PLAIN) + public Response sqsSendSpecificQueue(@PathParam("queueName") String queueName, String message) throws Exception { + final String response = producerTemplate.requestBody(componentUri(queueName), message, String.class); + return Response + .created(new URI("https://camel.apache.org/")) + .entity(response) + .build(); + } + + @Path("/sqs/purge/queue/{queueName}") + @DELETE + @Produces(MediaType.TEXT_PLAIN) + public Response purgeQueue(@PathParam("queueName") String queueName) throws Exception { + producerTemplate.sendBodyAndHeader(componentUri(queueName) + "?operation=purgeQueue", + null, + Sqs2Constants.SQS_QUEUE_PREFIX, + queueName); + return Response.ok().build(); + } + + @Path("/sqs/receive/{queueName}/{deleteMessage}") @GET @Produces(MediaType.TEXT_PLAIN) - public String sqsReceive() throws Exception { - return consumerTemplate.receiveBody(componentUri(), 10000, String.class); + public String sqsReceive(@PathParam("queueName") String queueName, @PathParam("deleteMessage") String deleteMessage) + throws Exception { + return consumerTemplate.receiveBody(componentUri(queueName) + + "?deleteAfterRead=" + deleteMessage + "&deleteIfFiltered=" + deleteMessage + "&defaultVisibilityTimeout=1", + 10000, + String.class); + } + + @Path("/sqs/receive/receipt/{queueName}") + @GET + @Produces(MediaType.TEXT_PLAIN) + public String sqsReceipt(@PathParam("queueName") String queueName) throws Exception { + return consumerTemplate.receive(componentUri(queueName), 10000) + .getIn() + .getHeader(Sqs2Constants.RECEIPT_HANDLE) + .toString(); } @Path("/sqs/queues") @@ -94,10 +135,67 @@ public class Aws2SqsSnsResource { .queueUrls(); } + @Path("/sqs/batch") + @POST + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.TEXT_PLAIN) + public Response sendBatchMessage(List<String> messages) throws Exception { + final SendMessageBatchResponse response = producerTemplate.requestBody( + componentUri() + "?operation=sendBatchMessage", + messages, + SendMessageBatchResponse.class); + return Response + .created(new URI("https://camel.apache.org/")) + .entity("" + response.successful().size()) + .build(); + } + + @Path("/sqs/delete/message/{queueName}/{receipt}") + @DELETE + @Produces(MediaType.TEXT_PLAIN) + public Response deleteMessage(@PathParam("queueName") String queueName, @PathParam("receipt") String receipt) + throws Exception { + producerTemplate.sendBodyAndHeader(componentUri(queueName) + "?operation=deleteMessage", + null, + Sqs2Constants.RECEIPT_HANDLE, + receipt); + return Response.ok().build(); + } + + @Path("/sqs/delete/queue/{queueName}") + @DELETE + @Produces(MediaType.TEXT_PLAIN) + public Response deleteQueue(@PathParam("queueName") String queueName) throws Exception { + producerTemplate.sendBodyAndHeader(componentUri(queueName) + "?operation=deleteQueue", + null, + Sqs2Constants.SQS_QUEUE_PREFIX, + queueName); + return Response.ok().build(); + } + + @Path("/sqs/queue/autocreate/delayed/{queueName}/{delay}") + @GET + @Produces(MediaType.APPLICATION_JSON) + public List<String> autoCreateDelayedQueue(@PathParam("queueName") String queueName, @PathParam("delay") String delay) + throws Exception { + // queue creation without any operation resulted in 405 status code + return producerTemplate + .requestBody( + "aws2-sqs://" + queueName + + "?autoCreateQueue=true&delayQueue=true&delaySeconds=" + delay + "&operation=listQueues", + null, + ListQueuesResponse.class) + .queueUrls(); + } + private String componentUri() { return "aws2-sqs://" + queueName; } + private String componentUri(String queueName) { + return "aws2-sqs://" + queueName; + } + @Path("/sns/send") @POST @Consumes(MediaType.TEXT_PLAIN) diff --git a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java index 76b39a6..c68c2ad 100644 --- a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java +++ b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java @@ -16,6 +16,11 @@ */ package org.apache.camel.quarkus.component.aws2.sqs.it; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -26,12 +31,12 @@ import io.restassured.RestAssured; import io.restassured.http.ContentType; import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource; import org.awaitility.Awaitility; -import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.core.Is.is; @QuarkusTest @@ -40,28 +45,157 @@ class Aws2SqsSnsTest { @Test public void sqs() { - final Config config = ConfigProvider.getConfig(); - final String queueName = config.getValue("aws-sqs.queue-name", String.class); + final String queueName = getPredefinedQueueName(); - String[] queues = RestAssured.get("/aws2-sqs-sns/sqs/queues") + final String[] queues = listQueues(); + Assertions.assertTrue(Stream.of(queues).anyMatch(url -> url.contains(queueName))); + + final String msg = sendSingleMessageToQueue(queueName); + awaitMessageFromQueue(msg, queueName); + } + + private String[] listQueues() { + return RestAssured.get("/aws2-sqs-sns/sqs/queues") .then() .statusCode(200) .extract() .body().as(String[].class); - Assertions.assertTrue(Stream.of(queues).anyMatch(url -> url.contains(queueName))); + } + + @Test + public void sqsDeleteMessage() { + final String qName = getPredefinedQueueName(); + final String msg = sendSingleMessageToQueue(qName); + final String receipt = receiveReceiptOfMessageFromQueue(qName); + deleteMessageFromQueue(qName, receipt); + Assertions.assertNotEquals(receiveMessageFromQueue(qName), msg); + } + + private String getPredefinedQueueName() { + return ConfigProvider.getConfig().getValue("aws-sqs.queue-name", String.class); + } + private String sendSingleMessageToQueue(String queueName) { final String msg = "sqs" + UUID.randomUUID().toString().replace("-", ""); RestAssured.given() .contentType(ContentType.TEXT) .body(msg) - .post("/aws2-sqs-sns/sqs/send") + .post("/aws2-sqs-sns/sqs/send/" + queueName) .then() .statusCode(201); + return msg; + } + + private String receiveReceiptOfMessageFromQueue(String queueName) { + return RestAssured.get("/aws2-sqs-sns/sqs/receive/receipt/" + queueName) + .then() + .statusCode(200) + .extract() + .body() + .asString(); + } + + private void deleteMessageFromQueue(String queueName, String receipt) { + RestAssured.delete("/aws2-sqs-sns/sqs/delete/message/" + queueName + "/" + receipt) + .then() + .statusCode(200); + } + + @Test + public void sqsAutoCreateDelayedQueue() { + final String qName = "delayQueue"; + final int delay = 10; + createDelayQueueAndVerifyExistence("delayQueue", 10); + final String msgSent = sendSingleMessageToQueue(qName); + Instant start = Instant.now(); + awaitMessageFromQueue(msgSent, qName); + Assertions.assertTrue(Duration.between(start, Instant.now()).getSeconds() >= delay); + deleteQueue(qName); + } + private void createDelayQueueAndVerifyExistence(String queueName, int delay) { + final String[] queues = RestAssured.get("/aws2-sqs-sns/sqs/queue/autocreate/delayed/" + queueName + "/" + delay) + .then() + .statusCode(200) + .extract() + .body() + .as(String[].class); + Assertions.assertTrue(Stream.of(queues).anyMatch(url -> url.contains(queueName))); + } + + private void awaitMessageFromQueue(String expectedContent, String queueName) { + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until( + () -> receiveMessageFromQueue(queueName), + Matchers.is(expectedContent)); + } + + private void deleteQueue(String queueName) { + RestAssured.get("/aws2-sqs-sns/sqs/delete/queue/" + queueName) + .then() + .statusCode(200); + awaitQueueDeleted(queueName); + } + + private void awaitQueueDeleted(String queueName) { Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until( - () -> RestAssured.get("/aws2-sqs-sns/sqs/receive").then().statusCode(200).extract().body().asString(), - Matchers.is(msg)); + () -> Stream.of(listQueues()).peek(System.out::println).noneMatch(url -> url.contains(queueName))); + } + + private String receiveMessageFromQueue(String queueName) { + return receiveMessageFromQueue(queueName, true); + } + + private String receiveMessageFromQueue(String queueName, boolean deleteMessage) { + return RestAssured.get("/aws2-sqs-sns/sqs/receive/" + queueName + "/" + deleteMessage) + .then() + .statusCode(anyOf(is(200), is(204))) + .extract() + .body() + .asString(); + } + + @Test + public void sqsSendBatchMessage() { + final List<String> messages = new ArrayList<>(Arrays.asList( + "Hello from camel-quarkus", + "This is a batch message test", + "Let's add few more messages", + "Next message will be last", + "Goodbye from camel-quarkus")); + Assertions.assertEquals(messages.size(), sendMessageBatchAndRetrieveSuccessCount(messages)); + } + + private int sendMessageBatchAndRetrieveSuccessCount(List<String> batch) { + return Integer.parseInt(RestAssured.given() + .contentType(ContentType.JSON) + .body(batch) + .post("/aws2-sqs-sns/sqs/batch") + .then() + .statusCode(201) + .extract() + .body() + .asString()); + } + + @Test + public void sqsPurgeQueue() { + final String qName = getPredefinedQueueName(); + sendSingleMessageToQueue(qName); + purgeQueue(qName); + awaitAllMessagesDeletedFromQueue(qName); + } + + private void purgeQueue(String queueName) { + RestAssured.delete("/aws2-sqs-sns/sqs/purge/queue/" + queueName) + .then() + .statusCode(200); + } + private void awaitAllMessagesDeletedFromQueue(String queueName) { + // it can take up to 60 seconds to purge all messages in queue as stated in documentation + Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60, TimeUnit.SECONDS).until( + () -> receiveMessageFromQueue(queueName, false), + Matchers.emptyOrNullString()); } @Test
