This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 2379e61  Test sending messages to an SNS FIFO topic #2625
2379e61 is described below

commit 2379e615d888a72cd9279bbb02d851be57238a91
Author: JiriOndrusek <[email protected]>
AuthorDate: Wed Jul 28 14:46:00 2021 +0200

    Test sending messages to an SNS FIFO topic #2625
---
 .../component/aws2/sqs/it/Aws2SqsSnsResource.java  |  25 +++-
 .../component/aws2/sqs/it/Aws2SqsSnsTest.java      |  18 +++
 .../aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java   | 150 ++++++++++++---------
 3 files changed, 125 insertions(+), 68 deletions(-)

diff --git 
a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
 
b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
index 228740b..003528a 100644
--- 
a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
+++ 
b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
@@ -22,6 +22,7 @@ import java.util.List;
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -51,6 +52,15 @@ public class Aws2SqsSnsResource {
     @ConfigProperty(name = "aws-sns.topic-name")
     String topicName;
 
+    @ConfigProperty(name = "aws-sqs.sns-fifo-receiver-queue-name")
+    String snsFifoReceiverQueueName;
+
+    @ConfigProperty(name = "aws2-sqs.sns-fifo-receiver-queue-arn")
+    String snsFifoReceiverQueueArn;
+
+    @ConfigProperty(name = "aws-sns-fifo.topic-name")
+    String fifoTopicName;
+
     @Inject
     ProducerTemplate producerTemplate;
 
@@ -92,10 +102,15 @@ public class Aws2SqsSnsResource {
     @POST
     @Consumes(MediaType.TEXT_PLAIN)
     @Produces(MediaType.TEXT_PLAIN)
-    public Response snsSend(String message, @QueryParam("queueUrl") String 
queueUrl) throws Exception {
+    public Response snsSend(String message,
+            @QueryParam("queueUrl") String queueUrl,
+            @DefaultValue("false") @QueryParam("fifo") boolean fifo) throws 
Exception {
 
         final String response = producerTemplate.requestBody(
-                "aws2-sns://" + topicName + 
"?subscribeSNStoSQS=true&queueUrl=RAW(" + snsReceiverQueueArn + ")", message,
+                
String.format("aws2-sns://%s?subscribeSNStoSQS=true&queueUrl=RAW(%s)%s",
+                        fifo ? fifoTopicName : topicName, fifo ? 
snsFifoReceiverQueueArn : snsReceiverQueueArn,
+                        fifo ? "&messageGroupIdStrategy=useExchangeId" : ""),
+                message,
                 String.class);
         return Response
                 .created(new URI("https://camel.apache.org/";))
@@ -110,4 +125,10 @@ public class Aws2SqsSnsResource {
         return consumerTemplate.receiveBody("aws2-sqs://" + 
snsReceiverQueueName, 10000, String.class);
     }
 
+    @Path("/snsFifo/receiveViaSqs")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public String fifoSqsReceiveViaSqs() throws Exception {
+        return consumerTemplate.receiveBody("aws2-sqs://" + 
snsFifoReceiverQueueName, 10000, String.class);
+    }
 }
diff --git 
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
 
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
index 1363775..76b39a6 100644
--- 
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
+++ 
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
@@ -82,4 +82,22 @@ class Aws2SqsSnsTest {
 
     }
 
+    @Test
+    void snsFifo() {
+        final String snsMsg = "snsFifo" + 
UUID.randomUUID().toString().replace("-", "");
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .queryParam("fifo", true)
+                .body(snsMsg)
+                .post("/aws2-sqs-sns/sns/send")
+                .then()
+                .statusCode(201);
+
+        RestAssured
+                .get("/aws2-sqs-sns/snsFifo/receiveViaSqs")
+                .then()
+                .statusCode(200)
+                .body("Message", is(snsMsg));
+    }
+
 }
diff --git 
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
 
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
index fb9d980..a69e3d8 100644
--- 
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
+++ 
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
@@ -61,73 +61,91 @@ public class Aws2SqsSnsTestEnvCustomizer implements 
Aws2TestEnvCustomizer {
         }
 
         /* SNS */
-        {
-            final String topicName = "camel-quarkus-" + 
RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT);
-            envContext.property("aws-sns.topic-name", topicName);
-
-            final SnsClient snsClient = envContext.client(Service.SNS, 
SnsClient::builder);
-
-            final String topicArn = 
snsClient.createTopic(CreateTopicRequest.builder().name(topicName).build()).topicArn();
-
-            envContext.closeable(() -> {
-                
snsClient.listSubscriptionsByTopic(ListSubscriptionsByTopicRequest.builder().topicArn(topicArn).build())
-                        .subscriptions()
-                        .stream()
-                        .map(Subscription::subscriptionArn)
-                        .forEach(arn -> 
snsClient.unsubscribe(UnsubscribeRequest.builder().subscriptionArn(arn).build()));
-                
snsClient.deleteTopic(DeleteTopicRequest.builder().topicArn(topicArn).build());
-            });
-
-            final String snsReceiverQueueName = "camel-quarkus-sns-receiver-"
-                    + 
RandomStringUtils.randomAlphanumeric(30).toLowerCase(Locale.ROOT);
-            envContext.property("aws-sqs.sns-receiver-queue-name", 
snsReceiverQueueName);
-            final String snsReceiverQueueUrl = sqsClient.createQueue(
-                    CreateQueueRequest.builder()
-                            .queueName(snsReceiverQueueName)
-                            .build())
-                    .queueUrl();
-            envContext.property("aws2-sqs.sns-receiver-queue-url", 
snsReceiverQueueUrl);
-
-            /*
-             * We need queue ARN instead of queue URL when creating a 
subscription of an SQS Queue to an SNS Topic
-             * See https://stackoverflow.com/a/59255978
-             */
-            final String snsReceiverQueueArn = sqsClient.getQueueAttributes(
-                    GetQueueAttributesRequest.builder()
-                            .queueUrl(snsReceiverQueueUrl)
-                            .attributeNamesWithStrings("All")
-                            .build())
-                    .attributesAsStrings()
-                    .get("QueueArn");
-            envContext.property("aws2-sqs.sns-receiver-queue-arn", 
snsReceiverQueueArn);
-
-            final String policy = "{"
-                    + "  \"Version\": \"2008-10-17\","
-                    + "  \"Id\": \"policy-" + snsReceiverQueueName + "\","
-                    + "  \"Statement\": ["
-                    + "    {"
-                    + "      \"Sid\": \"sid-" + snsReceiverQueueName + "\","
-                    + "      \"Effect\": \"Allow\","
-                    + "      \"Principal\": {"
-                    + "        \"AWS\": \"*\""
-                    + "      },"
-                    + "      \"Action\": \"SQS:*\","
-                    + "      \"Resource\": \"" + snsReceiverQueueArn + "\""
-                    + "    }"
-                    + "  ]"
-                    + "}";
-            sqsClient.setQueueAttributes(
-                    SetQueueAttributesRequest.builder()
-                            .queueUrl(snsReceiverQueueUrl)
-                            .attributes(
-                                    Collections.singletonMap(
-                                            QueueAttributeName.POLICY,
-                                            policy))
-                            .build());
-
-            envContext
-                    .closeable(() -> 
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(snsReceiverQueueUrl).build()));
+        customizeSns(envContext, sqsClient, false);
+        customizeSns(envContext, sqsClient, true);
+    }
+
+    private void customizeSns(Aws2TestEnvContext envContext, SqsClient 
sqsClient, boolean fifo) {
+        final String topicName = "camel-quarkus-" + 
RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT)
+                + (fifo ? ".fifo" : "");
+        envContext.property(fifo ? "aws-sns-fifo.topic-name" : 
"aws-sns.topic-name", topicName);
+
+        final SnsClient snsClient = envContext.client(Service.SNS, 
SnsClient::builder);
+
+        CreateTopicRequest.Builder topicRequestBuilder = 
CreateTopicRequest.builder()
+                .name(topicName);
+        if (fifo) {
+            
topicRequestBuilder.attributes(Collections.singletonMap("FifoTopic", 
Boolean.TRUE.toString()));
+        }
+
+        final String topicArn = 
snsClient.createTopic(topicRequestBuilder.build()).topicArn();
+
+        envContext.closeable(() -> {
+            
snsClient.listSubscriptionsByTopic(ListSubscriptionsByTopicRequest.builder().topicArn(topicArn).build())
+                    .subscriptions()
+                    .stream()
+                    .map(Subscription::subscriptionArn)
+                    .forEach(arn -> 
snsClient.unsubscribe(UnsubscribeRequest.builder().subscriptionArn(arn).build()));
+            
snsClient.deleteTopic(DeleteTopicRequest.builder().topicArn(topicArn).build());
+        });
+
+        final String snsReceiverQueueName = "camel-quarkus-sns-receiver-"
+                + 
RandomStringUtils.randomAlphanumeric(30).toLowerCase(Locale.ROOT) + (fifo ? 
".fifo" : "");
+        ;
+        envContext.property(fifo ? "aws-sqs.sns-fifo-receiver-queue-name" : 
"aws-sqs.sns-receiver-queue-name",
+                snsReceiverQueueName);
+        CreateQueueRequest.Builder createQueueRequestBuilder = 
CreateQueueRequest.builder()
+                .queueName(snsReceiverQueueName);
+        if (fifo) {
+            createQueueRequestBuilder
+                    
.attributes(Collections.singletonMap(QueueAttributeName.FIFO_QUEUE, 
Boolean.TRUE.toString()));
         }
 
+        final String snsReceiverQueueUrl = sqsClient.createQueue(
+                createQueueRequestBuilder.build())
+                .queueUrl();
+        envContext.property(fifo ? "aws2-sqs.sns-fifo-receiver-queue-url" : 
"aws2-sqs.sns-receiver-queue-url",
+                snsReceiverQueueUrl);
+
+        /*
+         * We need queue ARN instead of queue URL when creating a subscription 
of an SQS Queue to an SNS Topic
+         * See https://stackoverflow.com/a/59255978
+         */
+        final String snsReceiverQueueArn = sqsClient.getQueueAttributes(
+                GetQueueAttributesRequest.builder()
+                        .queueUrl(snsReceiverQueueUrl)
+                        .attributeNamesWithStrings("All")
+                        .build())
+                .attributesAsStrings()
+                .get("QueueArn");
+        envContext.property(fifo ? "aws2-sqs.sns-fifo-receiver-queue-arn" : 
"aws2-sqs.sns-receiver-queue-arn",
+                snsReceiverQueueArn);
+
+        final String policy = "{"
+                + "  \"Version\": \"2008-10-17\","
+                + "  \"Id\": \"policy-" + snsReceiverQueueName + "\","
+                + "  \"Statement\": ["
+                + "    {"
+                + "      \"Sid\": \"sid-" + snsReceiverQueueName + "\","
+                + "      \"Effect\": \"Allow\","
+                + "      \"Principal\": {"
+                + "        \"AWS\": \"*\""
+                + "      },"
+                + "      \"Action\": \"SQS:*\","
+                + "      \"Resource\": \"" + snsReceiverQueueArn + "\""
+                + "    }"
+                + "  ]"
+                + "}";
+        sqsClient.setQueueAttributes(
+                SetQueueAttributesRequest.builder()
+                        .queueUrl(snsReceiverQueueUrl)
+                        .attributes(
+                                Collections.singletonMap(
+                                        QueueAttributeName.POLICY,
+                                        policy))
+                        .build());
+
+        envContext
+                .closeable(() -> 
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(snsReceiverQueueUrl).build()));
     }
 }

Reply via email to