This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 2379e61 Test sending messages to an SNS FIFO topic #2625
2379e61 is described below
commit 2379e615d888a72cd9279bbb02d851be57238a91
Author: JiriOndrusek <[email protected]>
AuthorDate: Wed Jul 28 14:46:00 2021 +0200
Test sending messages to an SNS FIFO topic #2625
---
.../component/aws2/sqs/it/Aws2SqsSnsResource.java | 25 +++-
.../component/aws2/sqs/it/Aws2SqsSnsTest.java | 18 +++
.../aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java | 150 ++++++++++++---------
3 files changed, 125 insertions(+), 68 deletions(-)
diff --git
a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
index 228740b..003528a 100644
---
a/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
+++
b/integration-test-groups/aws2/aws2-sqs-sns/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsResource.java
@@ -22,6 +22,7 @@ import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -51,6 +52,15 @@ public class Aws2SqsSnsResource {
@ConfigProperty(name = "aws-sns.topic-name")
String topicName;
+ @ConfigProperty(name = "aws-sqs.sns-fifo-receiver-queue-name")
+ String snsFifoReceiverQueueName;
+
+ @ConfigProperty(name = "aws2-sqs.sns-fifo-receiver-queue-arn")
+ String snsFifoReceiverQueueArn;
+
+ @ConfigProperty(name = "aws-sns-fifo.topic-name")
+ String fifoTopicName;
+
@Inject
ProducerTemplate producerTemplate;
@@ -92,10 +102,15 @@ public class Aws2SqsSnsResource {
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
- public Response snsSend(String message, @QueryParam("queueUrl") String
queueUrl) throws Exception {
+ public Response snsSend(String message,
+ @QueryParam("queueUrl") String queueUrl,
+ @DefaultValue("false") @QueryParam("fifo") boolean fifo) throws
Exception {
final String response = producerTemplate.requestBody(
- "aws2-sns://" + topicName +
"?subscribeSNStoSQS=true&queueUrl=RAW(" + snsReceiverQueueArn + ")", message,
+
String.format("aws2-sns://%s?subscribeSNStoSQS=true&queueUrl=RAW(%s)%s",
+ fifo ? fifoTopicName : topicName, fifo ?
snsFifoReceiverQueueArn : snsReceiverQueueArn,
+ fifo ? "&messageGroupIdStrategy=useExchangeId" : ""),
+ message,
String.class);
return Response
.created(new URI("https://camel.apache.org/"))
@@ -110,4 +125,10 @@ public class Aws2SqsSnsResource {
return consumerTemplate.receiveBody("aws2-sqs://" +
snsReceiverQueueName, 10000, String.class);
}
+ @Path("/snsFifo/receiveViaSqs")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public String fifoSqsReceiveViaSqs() throws Exception {
+ return consumerTemplate.receiveBody("aws2-sqs://" +
snsFifoReceiverQueueName, 10000, String.class);
+ }
}
diff --git
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
index 1363775..76b39a6 100644
---
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
+++
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTest.java
@@ -82,4 +82,22 @@ class Aws2SqsSnsTest {
}
+ @Test
+ void snsFifo() {
+ final String snsMsg = "snsFifo" +
UUID.randomUUID().toString().replace("-", "");
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .queryParam("fifo", true)
+ .body(snsMsg)
+ .post("/aws2-sqs-sns/sns/send")
+ .then()
+ .statusCode(201);
+
+ RestAssured
+ .get("/aws2-sqs-sns/snsFifo/receiveViaSqs")
+ .then()
+ .statusCode(200)
+ .body("Message", is(snsMsg));
+ }
+
}
diff --git
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
index fb9d980..a69e3d8 100644
---
a/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
+++
b/integration-test-groups/aws2/aws2-sqs-sns/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsSnsTestEnvCustomizer.java
@@ -61,73 +61,91 @@ public class Aws2SqsSnsTestEnvCustomizer implements
Aws2TestEnvCustomizer {
}
/* SNS */
- {
- final String topicName = "camel-quarkus-" +
RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT);
- envContext.property("aws-sns.topic-name", topicName);
-
- final SnsClient snsClient = envContext.client(Service.SNS,
SnsClient::builder);
-
- final String topicArn =
snsClient.createTopic(CreateTopicRequest.builder().name(topicName).build()).topicArn();
-
- envContext.closeable(() -> {
-
snsClient.listSubscriptionsByTopic(ListSubscriptionsByTopicRequest.builder().topicArn(topicArn).build())
- .subscriptions()
- .stream()
- .map(Subscription::subscriptionArn)
- .forEach(arn ->
snsClient.unsubscribe(UnsubscribeRequest.builder().subscriptionArn(arn).build()));
-
snsClient.deleteTopic(DeleteTopicRequest.builder().topicArn(topicArn).build());
- });
-
- final String snsReceiverQueueName = "camel-quarkus-sns-receiver-"
- +
RandomStringUtils.randomAlphanumeric(30).toLowerCase(Locale.ROOT);
- envContext.property("aws-sqs.sns-receiver-queue-name",
snsReceiverQueueName);
- final String snsReceiverQueueUrl = sqsClient.createQueue(
- CreateQueueRequest.builder()
- .queueName(snsReceiverQueueName)
- .build())
- .queueUrl();
- envContext.property("aws2-sqs.sns-receiver-queue-url",
snsReceiverQueueUrl);
-
- /*
- * We need queue ARN instead of queue URL when creating a
subscription of an SQS Queue to an SNS Topic
- * See https://stackoverflow.com/a/59255978
- */
- final String snsReceiverQueueArn = sqsClient.getQueueAttributes(
- GetQueueAttributesRequest.builder()
- .queueUrl(snsReceiverQueueUrl)
- .attributeNamesWithStrings("All")
- .build())
- .attributesAsStrings()
- .get("QueueArn");
- envContext.property("aws2-sqs.sns-receiver-queue-arn",
snsReceiverQueueArn);
-
- final String policy = "{"
- + " \"Version\": \"2008-10-17\","
- + " \"Id\": \"policy-" + snsReceiverQueueName + "\","
- + " \"Statement\": ["
- + " {"
- + " \"Sid\": \"sid-" + snsReceiverQueueName + "\","
- + " \"Effect\": \"Allow\","
- + " \"Principal\": {"
- + " \"AWS\": \"*\""
- + " },"
- + " \"Action\": \"SQS:*\","
- + " \"Resource\": \"" + snsReceiverQueueArn + "\""
- + " }"
- + " ]"
- + "}";
- sqsClient.setQueueAttributes(
- SetQueueAttributesRequest.builder()
- .queueUrl(snsReceiverQueueUrl)
- .attributes(
- Collections.singletonMap(
- QueueAttributeName.POLICY,
- policy))
- .build());
-
- envContext
- .closeable(() ->
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(snsReceiverQueueUrl).build()));
+ customizeSns(envContext, sqsClient, false);
+ customizeSns(envContext, sqsClient, true);
+ }
+
+ private void customizeSns(Aws2TestEnvContext envContext, SqsClient
sqsClient, boolean fifo) {
+ final String topicName = "camel-quarkus-" +
RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT)
+ + (fifo ? ".fifo" : "");
+ envContext.property(fifo ? "aws-sns-fifo.topic-name" :
"aws-sns.topic-name", topicName);
+
+ final SnsClient snsClient = envContext.client(Service.SNS,
SnsClient::builder);
+
+ CreateTopicRequest.Builder topicRequestBuilder =
CreateTopicRequest.builder()
+ .name(topicName);
+ if (fifo) {
+
topicRequestBuilder.attributes(Collections.singletonMap("FifoTopic",
Boolean.TRUE.toString()));
+ }
+
+ final String topicArn =
snsClient.createTopic(topicRequestBuilder.build()).topicArn();
+
+ envContext.closeable(() -> {
+
snsClient.listSubscriptionsByTopic(ListSubscriptionsByTopicRequest.builder().topicArn(topicArn).build())
+ .subscriptions()
+ .stream()
+ .map(Subscription::subscriptionArn)
+ .forEach(arn ->
snsClient.unsubscribe(UnsubscribeRequest.builder().subscriptionArn(arn).build()));
+
snsClient.deleteTopic(DeleteTopicRequest.builder().topicArn(topicArn).build());
+ });
+
+ final String snsReceiverQueueName = "camel-quarkus-sns-receiver-"
+ +
RandomStringUtils.randomAlphanumeric(30).toLowerCase(Locale.ROOT) + (fifo ?
".fifo" : "");
+ ;
+ envContext.property(fifo ? "aws-sqs.sns-fifo-receiver-queue-name" :
"aws-sqs.sns-receiver-queue-name",
+ snsReceiverQueueName);
+ CreateQueueRequest.Builder createQueueRequestBuilder =
CreateQueueRequest.builder()
+ .queueName(snsReceiverQueueName);
+ if (fifo) {
+ createQueueRequestBuilder
+
.attributes(Collections.singletonMap(QueueAttributeName.FIFO_QUEUE,
Boolean.TRUE.toString()));
}
+ final String snsReceiverQueueUrl = sqsClient.createQueue(
+ createQueueRequestBuilder.build())
+ .queueUrl();
+ envContext.property(fifo ? "aws2-sqs.sns-fifo-receiver-queue-url" :
"aws2-sqs.sns-receiver-queue-url",
+ snsReceiverQueueUrl);
+
+ /*
+ * We need queue ARN instead of queue URL when creating a subscription
of an SQS Queue to an SNS Topic
+ * See https://stackoverflow.com/a/59255978
+ */
+ final String snsReceiverQueueArn = sqsClient.getQueueAttributes(
+ GetQueueAttributesRequest.builder()
+ .queueUrl(snsReceiverQueueUrl)
+ .attributeNamesWithStrings("All")
+ .build())
+ .attributesAsStrings()
+ .get("QueueArn");
+ envContext.property(fifo ? "aws2-sqs.sns-fifo-receiver-queue-arn" :
"aws2-sqs.sns-receiver-queue-arn",
+ snsReceiverQueueArn);
+
+ final String policy = "{"
+ + " \"Version\": \"2008-10-17\","
+ + " \"Id\": \"policy-" + snsReceiverQueueName + "\","
+ + " \"Statement\": ["
+ + " {"
+ + " \"Sid\": \"sid-" + snsReceiverQueueName + "\","
+ + " \"Effect\": \"Allow\","
+ + " \"Principal\": {"
+ + " \"AWS\": \"*\""
+ + " },"
+ + " \"Action\": \"SQS:*\","
+ + " \"Resource\": \"" + snsReceiverQueueArn + "\""
+ + " }"
+ + " ]"
+ + "}";
+ sqsClient.setQueueAttributes(
+ SetQueueAttributesRequest.builder()
+ .queueUrl(snsReceiverQueueUrl)
+ .attributes(
+ Collections.singletonMap(
+ QueueAttributeName.POLICY,
+ policy))
+ .build());
+
+ envContext
+ .closeable(() ->
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(snsReceiverQueueUrl).build()));
}
}