This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push:
new af048b64680 CAMEL-17860 add tests in camel-aws2-sqs-starter (#505)
af048b64680 is described below
commit af048b646807db11a0c22a92e5a75e60c7c871b2
Author: JiriOndrusek <[email protected]>
AuthorDate: Wed Apr 6 09:16:52 2022 +0200
CAMEL-17860 add tests in camel-aws2-sqs-starter (#505)
---
components-starter/camel-aws2-sqs-starter/pom.xml | 12 ++
.../apache/camel/component/aws2/sqs/BaseSqs.java | 70 ++++++++
.../component/aws2/sqs/JmsStyleSelectorTest.java | 100 +++++++++++
.../camel/component/aws2/sqs/SqsComponentTest.java | 128 ++++++++++++++
.../component/aws2/sqs/SqsDeadletterTest.java | 96 ++++++++++
.../component/aws2/sqs/SqsDelayedQueueTest.java | 83 +++++++++
.../component/aws2/sqs/SqsOperationsTest.java | 194 +++++++++++++++++++++
.../aws2/sqs/SqsProducerAutoCreateQueueTest.java | 112 ++++++++++++
.../org/apache/camel/component/aws2/sqs/policy.txt | 1 +
9 files changed, 796 insertions(+)
diff --git a/components-starter/camel-aws2-sqs-starter/pom.xml
b/components-starter/camel-aws2-sqs-starter/pom.xml
index 9600327f074..8d1eebf2070 100644
--- a/components-starter/camel-aws2-sqs-starter/pom.xml
+++ b/components-starter/camel-aws2-sqs-starter/pom.xml
@@ -47,6 +47,18 @@
</exclusions>
<!--END OF GENERATED CODE-->
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-infra-aws-v2</artifactId>
+ <version>${camel-version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
<!--START OF GENERATED CODE-->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/BaseSqs.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/BaseSqs.java
new file mode 100644
index 00000000000..c05ff0076a0
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/BaseSqs.java
@@ -0,0 +1,70 @@
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Configuration;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.camel.test.infra.common.SharedNameGenerator;
+import org.apache.camel.test.infra.common.TestEntityNameGenerator;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.services.sqs.SqsClient;
+
+import java.util.UUID;
+
+public class BaseSqs {
+
+ @Autowired
+ protected CamelContext context;
+
+ @Autowired
+ protected ProducerTemplate producerTemplate;
+
+ @Autowired
+ protected ConsumerTemplate consumerTemplate;
+
+ @RegisterExtension
+ public static AWSService service =
AWSServiceFactory.createDynamodbService();
+
+ @RegisterExtension
+ public static SharedNameGenerator sharedNameGenerator = new
TestEntityNameGenerator();
+
+ protected void assertMockEndpointsSatisfied() throws InterruptedException {
+ MockEndpoint.assertIsSatisfied(this.context);
+ }
+
+ @AfterAll
+ private static void closeClient() {
+ service.close();
+ }
+
+ String sendSingleMessageToQueue(String queueName) {
+ final String msg = "sqs" + UUID.randomUUID().toString().replace("-",
"");
+ return producerTemplate.requestBody("aws2-sqs://" + queueName, msg,
String.class);
+ }
+
+ String receiveMessageFromQueue(String queueName, boolean deleteMessage) {
+ return
consumerTemplate.receiveBody(String.format("aws2-sqs://%s?deleteAfterRead=%s&deleteIfFiltered=%s&defaultVisibilityTimeout=0",
queueName, deleteMessage, deleteMessage),
+ 10000,
+ String.class);
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public static class TestConfiguration {
+
+ @Bean
+ public SqsClient sqsClient(CamelContext context) {
+ return AWSSDKClientUtils.newSQSClient();
+ }
+ }
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/JmsStyleSelectorTest.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/JmsStyleSelectorTest.java
new file mode 100644
index 00000000000..45e5c861293
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/JmsStyleSelectorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Based on SqsConsumerMessageLocalstackIT
+ */
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ JmsStyleSelectorTest.class,
+ JmsStyleSelectorTest.TestConfiguration.class
+ }
+)
+public class JmsStyleSelectorTest extends BaseSqs {
+
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(1);
+
+ template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("ignore");
+ }
+ });
+
+ template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("test1");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration extends BaseSqs.TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
from("direct:start").startupOrder(2).toF("aws2-sqs://%s?autoCreateQueue=true",
sharedNameGenerator.getName());
+
+
fromF("aws2-sqs://%s?deleteAfterRead=false&deleteIfFiltered=true&autoCreateQueue=true",
+ sharedNameGenerator.getName())
+ .startupOrder(1)
+ .filter(simple("${body} !=
'ignore'")).log("${body}").log("${header.CamelAwsSqsReceiptHandle}")
+ .to("mock:result");
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsComponentTest.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsComponentTest.java
new file mode 100644
index 00000000000..52915ff9251
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsComponentTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ SqsComponentTest.class,
+ SqsComponentTest.TestConfiguration.class
+ }
+)
+public class SqsComponentTest extends BaseSqs {
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start",
ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("This is my message text.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ Exchange resultExchange = result.getExchanges().get(0);
+ assertEquals("This is my message text.",
resultExchange.getIn().getBody());
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
+ assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
+
+ assertNotNull(exchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
+ assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
exchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
+ }
+
+ @Test
+ public void sendInOut() throws Exception {
+ result.expectedMessageCount(1);
+
+ Exchange exchange = template.send("direct:start",
ExchangePattern.InOut, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("This is my message text.");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+
+ Exchange resultExchange = result.getExchanges().get(0);
+ assertEquals("This is my message text.",
resultExchange.getIn().getBody());
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
+ assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
+
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
+
+
assertNotNull(exchange.getMessage().getHeader(Sqs2Constants.MESSAGE_ID));
+ assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
exchange.getMessage().getHeader(Sqs2Constants.MD5_OF_BODY));
+ }
+
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration extends BaseSqs.TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ final String sqsEndpointUri = String
+
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+ sharedNameGenerator.getName(),
+ "1209600", "65536", "60",
+
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to(sqsEndpointUri);
+
+ from(sqsEndpointUri).to("mock:result");
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDeadletterTest.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDeadletterTest.java
new file mode 100644
index 00000000000..a6d4a084526
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDeadletterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ SqsDeadletterTest.class,
+ SqsDeadletterTest.TestConfiguration.class
+ }
+)
+public class SqsDeadletterTest extends BaseSqs {
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void deadletter() throws Exception {
+ result.expectedMessageCount(1);
+
+ template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("test1");
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration extends BaseSqs.TestConfiguration {
+
+ @Bean
+ public RouteBuilder routeBuilder() {
+ final String sqsEndpointUri = String
+
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+ sharedNameGenerator.getName(),
+ "1209600", "65536", "60",
+
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ String deadletterName = sharedNameGenerator.getName() +
"_deadletter";
+
+
errorHandler(deadLetterChannel(String.format("aws2-sqs://%s?autoCreateQueue=true",
deadletterName))
+ .useOriginalMessage());
+
+ from("direct:start").startupOrder(2).process(e -> {
+ throw new IllegalStateException();
+ }).toF("aws2-sqs://%s?autoCreateQueue=true",
sharedNameGenerator.getName());
+
+ fromF("aws2-sqs://%s", deadletterName).to("mock:result");
+ }
+ };
+ }
+ }
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDelayedQueueTest.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDelayedQueueTest.java
new file mode 100644
index 00000000000..c4f763f3e3b
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDelayedQueueTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.is;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ SqsDelayedQueueTest.class,
+ BaseSqs.TestConfiguration.class
+ }
+)
+/**
+ * Based on camel-quarkus Aws2SqsTest#sqsAutoCreateDelayedQueue
+ */
+public class SqsDelayedQueueTest extends BaseSqs {
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void delayedQueue() throws Exception {
+ int delay = 20;
+ String delayedQueueuName = sharedNameGenerator.getName() + "_delayed";
+ Instant start = Instant.now();
+ //create delayed queue
+ List<String> queues = producerTemplate
+ .requestBody(
+
String.format("aws2-sqs://%s?autoCreateQueue=true&delayQueue=true&delaySeconds=%d&operation=listQueues",
delayedQueueuName, delay),
+ null,
+ ListQueuesResponse.class)
+ .queueUrls();
+
+ String msg = sendSingleMessageToQueue(delayedQueueuName);
+ awaitMessageWithExpectedContentFromQueue(msg, delayedQueueuName);
+
+ Assertions.assertTrue(Duration.between(start,
Instant.now()).getSeconds() >= delay);
+ }
+
+ private void awaitMessageWithExpectedContentFromQueue(String
expectedContent, String queueName) {
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120,
TimeUnit.SECONDS).until(
+ () ->
expectedContent.equals(receiveMessageFromQueue(queueName, false)));
+
+ }
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsOperationsTest.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsOperationsTest.java
new file mode 100644
index 00000000000..a7cf07455fd
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsOperationsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
+import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ SqsOperationsTest.class,
+ SqsOperationsTest.TestConfiguration.class
+ }
+)
+class SqsOperationsTest extends BaseSqs {
+
+ private static final String queueName = "Aws2SqsTest_queue_" +
RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT);
+
+ private static String queueUrl;
+
+ @BeforeAll
+ protected static void setupResources() {
+ final SqsClient sqsClient = AWSSDKClientUtils.newSQSClient();
+ {
+ queueUrl = sqsClient.createQueue(
+ CreateQueueRequest.builder()
+ .queueName(queueName)
+ .build())
+ .queueUrl();
+ }
+ }
+
+ @AfterAll
+ protected static void cleanupResources() {
+ final SqsClient sqsClient = AWSSDKClientUtils.newSQSClient();
+
+
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
+ }
+
+ @AfterEach
+ void purgeQueueAndWait() {
+ purgeQueue(queueName);
+ Assertions.assertNull(receiveMessageFromQueue(queueName, false));
+ }
+
+
+ @Test
+ void simpleInOut() {
+
Assertions.assertTrue(clientListQueues().stream().distinct().anyMatch(u ->
u.contains(queueName)));
+
+ final String msg = sendSingleMessageToQueue(queueName);
+ awaitMessageWithExpectedContentFromQueue(msg, queueName);
+ }
+
+ @Test
+ void listQueues() {
+
Assertions.assertTrue(clientListQueues().stream().distinct().anyMatch(u ->
u.contains(queueName)));
+ }
+
+ @Test
+ void sqsDeleteMessage() {
+ sendSingleMessageToQueue(queueName);
+ final String receipt = receiveReceipt(queueName);
+ final String msg = sendSingleMessageToQueue(queueName);
+ deleteMessageFromQueue(queueName, receipt);
+ // assertion is here twice because in case delete wouldn't work in our
queue would be two messages
+ // it's possible that the first retrieval would retrieve the correct
message and therefore the test
+ // would incorrectly pass. By receiving message twice we check if the
message was really deleted.
+ Assertions.assertEquals(receiveMessageFromQueue(queueName, false),
msg);
+ Assertions.assertEquals(receiveMessageFromQueue(queueName, false),
msg);
+ }
+
+ @Test
+ void sqsSendBatchMessage() {
+ final List<String> messages = new ArrayList<>(Arrays.asList(
+ "Hello from camel-quarkus",
+ "This is a batch message test",
+ "Let's add few more messages",
+ "Next message will be last",
+ "Goodbye from camel-quarkus"));
+ Assertions.assertEquals(messages.size(),
sendMessageBatchAndRetrieveSuccessCount(queueName, messages));
+ }
+
+ // helper methods
+
+ private void purgeQueue(String queueName) {
+ producerTemplate.sendBodyAndHeader("aws2-sqs://" + queueName +
"?operation=purgeQueue",
+ null,
+ Sqs2Constants.SQS_QUEUE_PREFIX,
+ queueName);
+ }
+
+ private int sendMessageBatchAndRetrieveSuccessCount(String queueName,
List<String> messages) {
+ return producerTemplate.requestBody(
+ "aws2-sqs://" + queueName + "?operation=sendBatchMessage",
+ messages,
+ SendMessageBatchResponse.class).successful().size();
+ }
+
+ private List<String> clientListQueues() {
+ return producerTemplate.requestBody("aws2-sqs://" + queueName +
"?operation=listQueues", null, ListQueuesResponse.class)
+ .queueUrls();
+ }
+
+
+
+ private String receiveReceipt(String queueName) {
+ Exchange exchange = consumerTemplate.receive("aws2-sqs://" +
queueName, 5000);
+ return exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE,
String.class);
+ }
+
+ private void awaitMessageWithExpectedContentFromQueue(String
expectedContent, String queueName) {
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(10,
TimeUnit.SECONDS).until(
+ () ->
expectedContent.equals(receiveMessageFromQueue(queueName, true)));
+ }
+
+ private void deleteMessageFromQueue(String queueName, String receipt) {
+ producerTemplate.sendBodyAndHeader("aws2-sqs://" + queueName +
"?operation=deleteMessage",
+ null,
+ Sqs2Constants.RECEIPT_HANDLE,
+ URLDecoder.decode(receipt, StandardCharsets.UTF_8));
+ }
+
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration extends BaseSqs.TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ final String sqsEndpointUri = String
+
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+ sharedNameGenerator.getName(),
+ "1209600", "65536", "60",
+
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to(sqsEndpointUri);
+
+ from(sqsEndpointUri).to("mock:result");
+ }
+ };
+ }
+ }
+
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerAutoCreateQueueTest.java
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerAutoCreateQueueTest.java
new file mode 100644
index 00000000000..3844a1d3f75
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerAutoCreateQueueTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+ classes = {
+ CamelAutoConfiguration.class,
+ SqsProducerAutoCreateQueueTest.class,
+ SqsProducerAutoCreateQueueTest.TestConfiguration.class
+ }
+)
+public class SqsProducerAutoCreateQueueTest extends BaseSqs {
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+
+ SqsClient client = AWSSDKClientUtils.newSQSClient();
+ for (int i = 0; i < 2000; i++) {
+ client.createQueue(CreateQueueRequest.builder().queueName("queue-"
+ String.valueOf(i)).build());
+ }
+ }
+
+ @Test
+ public void sendInOnly() throws Exception {
+ result.expectedMessageCount(5);
+
+ template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+ public void process(Exchange exchange) {
+ Collection c = new ArrayList<Integer>();
+ c.add("1");
+ c.add("2");
+ c.add("3");
+ c.add("4");
+ c.add("5");
+ exchange.getIn().setBody(c);
+ }
+ });
+
+ assertMockEndpointsSatisfied();
+ }
+
+ // *************************************
+ // Config
+ // *************************************
+
+ @Configuration
+ public class TestConfiguration extends BaseSqs.TestConfiguration {
+ @Bean
+ public RouteBuilder routeBuilder() {
+ final String sqsEndpointUri = String
+
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+ sharedNameGenerator.getName(),
+ "1209600", "65536", "60",
+
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
from("direct:start").startupOrder(2).setHeader(Sqs2Constants.SQS_OPERATION,
constant("sendBatchMessage"))
+ .toF("aws2-sqs://%s?autoCreateQueue=true",
"queue-2001");
+
+
fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", "queue-2001")
+ .startupOrder(1).log("${body}").to("mock:result");
+ }
+ };
+ }
+ }
+}
diff --git
a/components-starter/camel-aws2-sqs-starter/src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt
b/components-starter/camel-aws2-sqs-starter/src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt
new file mode 100644
index 00000000000..135a9337f25
--- /dev/null
+++
b/components-starter/camel-aws2-sqs-starter/src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt
@@ -0,0 +1 @@
+{"Version":"2008-10-17","Id":"/195004372649/MyNewCamelQueue/SQSDefaultPolicy","Statement":[{"Sid":"Queue1ReceiveMessage","Effect":"Allow","Principal":{"AWS":"*"},"Action":"SQS:ReceiveMessage","Resource":"/195004372649/MyNewCamelQueue"}]}