This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new af048b64680 CAMEL-17860 add tests in camel-aws2-sqs-starter (#505)
af048b64680 is described below

commit af048b646807db11a0c22a92e5a75e60c7c871b2
Author: JiriOndrusek <[email protected]>
AuthorDate: Wed Apr 6 09:16:52 2022 +0200

    CAMEL-17860 add tests in camel-aws2-sqs-starter (#505)
---
 components-starter/camel-aws2-sqs-starter/pom.xml  |  12 ++
 .../apache/camel/component/aws2/sqs/BaseSqs.java   |  70 ++++++++
 .../component/aws2/sqs/JmsStyleSelectorTest.java   | 100 +++++++++++
 .../camel/component/aws2/sqs/SqsComponentTest.java | 128 ++++++++++++++
 .../component/aws2/sqs/SqsDeadletterTest.java      |  96 ++++++++++
 .../component/aws2/sqs/SqsDelayedQueueTest.java    |  83 +++++++++
 .../component/aws2/sqs/SqsOperationsTest.java      | 194 +++++++++++++++++++++
 .../aws2/sqs/SqsProducerAutoCreateQueueTest.java   | 112 ++++++++++++
 .../org/apache/camel/component/aws2/sqs/policy.txt |   1 +
 9 files changed, 796 insertions(+)

diff --git a/components-starter/camel-aws2-sqs-starter/pom.xml 
b/components-starter/camel-aws2-sqs-starter/pom.xml
index 9600327f074..8d1eebf2070 100644
--- a/components-starter/camel-aws2-sqs-starter/pom.xml
+++ b/components-starter/camel-aws2-sqs-starter/pom.xml
@@ -47,6 +47,18 @@
       </exclusions>
       <!--END OF GENERATED CODE-->
     </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-infra-aws-v2</artifactId>
+      <version>${camel-version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
     <!--START OF GENERATED CODE-->
     <dependency>
       <groupId>org.apache.camel.springboot</groupId>
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/BaseSqs.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/BaseSqs.java
new file mode 100644
index 00000000000..c05ff0076a0
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/BaseSqs.java
@@ -0,0 +1,70 @@
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Configuration;
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.camel.test.infra.common.SharedNameGenerator;
+import org.apache.camel.test.infra.common.TestEntityNameGenerator;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import software.amazon.awssdk.services.sqs.SqsClient;
+
+import java.util.UUID;
+
+public class BaseSqs {
+
+    @Autowired
+    protected CamelContext context;
+
+    @Autowired
+    protected ProducerTemplate producerTemplate;
+
+    @Autowired
+    protected ConsumerTemplate consumerTemplate;
+
+    @RegisterExtension
+    public static AWSService service = 
AWSServiceFactory.createDynamodbService();
+
+    @RegisterExtension
+    public static SharedNameGenerator sharedNameGenerator = new 
TestEntityNameGenerator();
+
+    protected void assertMockEndpointsSatisfied() throws InterruptedException {
+        MockEndpoint.assertIsSatisfied(this.context);
+    }
+
+    @AfterAll
+    private static void closeClient() {
+        service.close();
+    }
+
+    String sendSingleMessageToQueue(String queueName) {
+        final String msg = "sqs" + UUID.randomUUID().toString().replace("-", 
"");
+        return producerTemplate.requestBody("aws2-sqs://" + queueName, msg, 
String.class);
+    }
+
+    String receiveMessageFromQueue(String queueName, boolean deleteMessage) {
+        return 
consumerTemplate.receiveBody(String.format("aws2-sqs://%s?deleteAfterRead=%s&deleteIfFiltered=%s&defaultVisibilityTimeout=0",
 queueName, deleteMessage, deleteMessage),
+                10000,
+                String.class);
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public static class TestConfiguration {
+
+        @Bean
+        public SqsClient sqsClient(CamelContext context) {
+            return AWSSDKClientUtils.newSQSClient();
+        }
+    }
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/JmsStyleSelectorTest.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/JmsStyleSelectorTest.java
new file mode 100644
index 00000000000..45e5c861293
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/JmsStyleSelectorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Based on SqsConsumerMessageLocalstackIT
+ */
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                JmsStyleSelectorTest.class,
+                JmsStyleSelectorTest.TestConfiguration.class
+        }
+)
+public class JmsStyleSelectorTest extends BaseSqs {
+
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(1);
+
+        template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("ignore");
+            }
+        });
+
+        template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("test1");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration extends  BaseSqs.TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    
from("direct:start").startupOrder(2).toF("aws2-sqs://%s?autoCreateQueue=true", 
sharedNameGenerator.getName());
+
+                    
fromF("aws2-sqs://%s?deleteAfterRead=false&deleteIfFiltered=true&autoCreateQueue=true",
+                            sharedNameGenerator.getName())
+                            .startupOrder(1)
+                            .filter(simple("${body} != 
'ignore'")).log("${body}").log("${header.CamelAwsSqsReceiptHandle}")
+                            .to("mock:result");
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsComponentTest.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsComponentTest.java
new file mode 100644
index 00000000000..52915ff9251
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsComponentTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqsComponentTest.class,
+                SqsComponentTest.TestConfiguration.class
+        }
+)
+public class SqsComponentTest extends BaseSqs {
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(1);
+
+        Exchange exchange = template.send("direct:start", 
ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("This is my message text.");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+
+        Exchange resultExchange = result.getExchanges().get(0);
+        assertEquals("This is my message text.", 
resultExchange.getIn().getBody());
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
+        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
+
+        assertNotNull(exchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
+        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
exchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
+    }
+
+    @Test
+    public void sendInOut() throws Exception {
+        result.expectedMessageCount(1);
+
+        Exchange exchange = template.send("direct:start", 
ExchangePattern.InOut, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("This is my message text.");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+
+        Exchange resultExchange = result.getExchanges().get(0);
+        assertEquals("This is my message text.", 
resultExchange.getIn().getBody());
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
+        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
+        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
+
+        
assertNotNull(exchange.getMessage().getHeader(Sqs2Constants.MESSAGE_ID));
+        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
exchange.getMessage().getHeader(Sqs2Constants.MD5_OF_BODY));
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration extends  BaseSqs.TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            final String sqsEndpointUri = String
+                    
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+                            sharedNameGenerator.getName(),
+                            "1209600", "65536", "60",
+                            
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:start").to(sqsEndpointUri);
+
+                    from(sqsEndpointUri).to("mock:result");
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDeadletterTest.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDeadletterTest.java
new file mode 100644
index 00000000000..a6d4a084526
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDeadletterTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqsDeadletterTest.class,
+                SqsDeadletterTest.TestConfiguration.class
+        }
+)
+public class SqsDeadletterTest extends BaseSqs {
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void deadletter() throws Exception {
+        result.expectedMessageCount(1);
+
+        template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("test1");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration extends BaseSqs.TestConfiguration {
+
+        @Bean
+        public RouteBuilder routeBuilder() {
+            final String sqsEndpointUri = String
+                    
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+                            sharedNameGenerator.getName(),
+                            "1209600", "65536", "60",
+                            
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    String deadletterName = sharedNameGenerator.getName() + 
"_deadletter";
+
+                    
errorHandler(deadLetterChannel(String.format("aws2-sqs://%s?autoCreateQueue=true",
 deadletterName))
+                            .useOriginalMessage());
+
+                    from("direct:start").startupOrder(2).process(e -> {
+                        throw new IllegalStateException();
+                    }).toF("aws2-sqs://%s?autoCreateQueue=true", 
sharedNameGenerator.getName());
+
+                    fromF("aws2-sqs://%s", deadletterName).to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDelayedQueueTest.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDelayedQueueTest.java
new file mode 100644
index 00000000000..c4f763f3e3b
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsDelayedQueueTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.is;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqsDelayedQueueTest.class,
+                BaseSqs.TestConfiguration.class
+        }
+)
+/**
+ * Based on camel-quarkus Aws2SqsTest#sqsAutoCreateDelayedQueue
+ */
+public class SqsDelayedQueueTest extends BaseSqs {
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void delayedQueue() throws Exception {
+        int delay = 20;
+        String delayedQueueuName = sharedNameGenerator.getName() + "_delayed";
+        Instant start = Instant.now(); 
+        //create delayed queue
+        List<String> queues = producerTemplate
+                .requestBody(
+                        
String.format("aws2-sqs://%s?autoCreateQueue=true&delayQueue=true&delaySeconds=%d&operation=listQueues",
 delayedQueueuName, delay),
+                        null,
+                        ListQueuesResponse.class)
+                .queueUrls();
+        
+        String msg = sendSingleMessageToQueue(delayedQueueuName);
+        awaitMessageWithExpectedContentFromQueue(msg, delayedQueueuName);
+
+        Assertions.assertTrue(Duration.between(start, 
Instant.now()).getSeconds() >= delay);
+    }
+
+    private void awaitMessageWithExpectedContentFromQueue(String 
expectedContent, String queueName) {
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, 
TimeUnit.SECONDS).until(
+                () -> 
expectedContent.equals(receiveMessageFromQueue(queueName, false)));
+
+    }
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsOperationsTest.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsOperationsTest.java
new file mode 100644
index 00000000000..a7cf07455fd
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsOperationsTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
+import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqsOperationsTest.class,
+                SqsOperationsTest.TestConfiguration.class
+        }
+)
+class SqsOperationsTest extends  BaseSqs {
+
+    private static final String queueName = "Aws2SqsTest_queue_" + 
RandomStringUtils.randomAlphanumeric(49).toLowerCase(Locale.ROOT);
+
+    private static String queueUrl;
+
+    @BeforeAll
+    protected static void setupResources()  {
+        final SqsClient sqsClient = AWSSDKClientUtils.newSQSClient();
+        {
+            queueUrl = sqsClient.createQueue(
+                            CreateQueueRequest.builder()
+                                    .queueName(queueName)
+                                    .build())
+                    .queueUrl();
+        }
+    }
+
+    @AfterAll
+    protected static void cleanupResources() {
+        final SqsClient sqsClient = AWSSDKClientUtils.newSQSClient();
+
+        
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
+    }
+
+    @AfterEach
+    void purgeQueueAndWait() {
+        purgeQueue(queueName);
+        Assertions.assertNull(receiveMessageFromQueue(queueName, false));
+    }
+
+
+    @Test
+    void simpleInOut() {
+        
Assertions.assertTrue(clientListQueues().stream().distinct().anyMatch(u -> 
u.contains(queueName)));
+
+        final String msg = sendSingleMessageToQueue(queueName);
+        awaitMessageWithExpectedContentFromQueue(msg, queueName);
+    }
+
+    @Test
+    void listQueues() {
+        
Assertions.assertTrue(clientListQueues().stream().distinct().anyMatch(u -> 
u.contains(queueName)));
+    }
+
+    @Test
+    void sqsDeleteMessage() {
+        sendSingleMessageToQueue(queueName);
+        final String receipt = receiveReceipt(queueName);
+        final String msg = sendSingleMessageToQueue(queueName);
+        deleteMessageFromQueue(queueName, receipt);
+        // assertion is here twice because in case delete wouldn't work in our 
queue would be two messages
+        // it's possible that the first retrieval would retrieve the correct 
message and therefore the test
+        // would incorrectly pass. By receiving message twice we check if the 
message was really deleted.
+        Assertions.assertEquals(receiveMessageFromQueue(queueName, false), 
msg);
+        Assertions.assertEquals(receiveMessageFromQueue(queueName, false), 
msg);
+    }
+
+    @Test
+    void sqsSendBatchMessage() {
+        final List<String> messages = new ArrayList<>(Arrays.asList(
+                "Hello from camel-quarkus",
+                "This is a batch message test",
+                "Let's add few more messages",
+                "Next message will be last",
+                "Goodbye from camel-quarkus"));
+        Assertions.assertEquals(messages.size(), 
sendMessageBatchAndRetrieveSuccessCount(queueName, messages));
+    }
+
+    // helper methods
+
+    private void purgeQueue(String queueName) {
+        producerTemplate.sendBodyAndHeader("aws2-sqs://" + queueName + 
"?operation=purgeQueue",
+                null,
+                Sqs2Constants.SQS_QUEUE_PREFIX,
+                queueName);
+    }
+
+    private int sendMessageBatchAndRetrieveSuccessCount(String queueName, 
List<String> messages) {
+        return producerTemplate.requestBody(
+                "aws2-sqs://" + queueName + "?operation=sendBatchMessage",
+                messages,
+                SendMessageBatchResponse.class).successful().size();
+    }
+
+    private List<String> clientListQueues() {
+        return producerTemplate.requestBody("aws2-sqs://" + queueName + 
"?operation=listQueues", null, ListQueuesResponse.class)
+                .queueUrls();
+    }
+
+
+
+    private String receiveReceipt(String queueName) {
+        Exchange exchange = consumerTemplate.receive("aws2-sqs://" + 
queueName, 5000);
+        return exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, 
String.class);
+    }
+
+    private void awaitMessageWithExpectedContentFromQueue(String 
expectedContent, String queueName) {
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(10, 
TimeUnit.SECONDS).until(
+                () -> 
expectedContent.equals(receiveMessageFromQueue(queueName, true)));
+    }
+
+    private void deleteMessageFromQueue(String queueName, String receipt) {
+        producerTemplate.sendBodyAndHeader("aws2-sqs://" + queueName + 
"?operation=deleteMessage",
+                null,
+                Sqs2Constants.RECEIPT_HANDLE,
+                URLDecoder.decode(receipt, StandardCharsets.UTF_8));
+    }
+
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration extends  BaseSqs.TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            final String sqsEndpointUri = String
+                    
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+                            sharedNameGenerator.getName(),
+                            "1209600", "65536", "60",
+                            
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("direct:start").to(sqsEndpointUri);
+
+                    from(sqsEndpointUri).to("mock:result");
+                }
+            };
+        }
+    }
+
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerAutoCreateQueueTest.java
 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerAutoCreateQueueTest.java
new file mode 100644
index 00000000000..3844a1d3f75
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerAutoCreateQueueTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.Configuration;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.annotation.DirtiesContext;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+@DirtiesContext
+@CamelSpringBootTest
+@SpringBootTest(
+        classes = {
+                CamelAutoConfiguration.class,
+                SqsProducerAutoCreateQueueTest.class,
+                SqsProducerAutoCreateQueueTest.TestConfiguration.class
+        }
+)
+public class SqsProducerAutoCreateQueueTest extends BaseSqs {
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+
+        SqsClient client = AWSSDKClientUtils.newSQSClient();
+        for (int i = 0; i < 2000; i++) {
+            client.createQueue(CreateQueueRequest.builder().queueName("queue-" 
+ String.valueOf(i)).build());
+        }
+    }
+
+    @Test
+    public void sendInOnly() throws Exception {
+        result.expectedMessageCount(5);
+
+        template.send("direct:start", ExchangePattern.InOnly, new Processor() {
+            public void process(Exchange exchange) {
+                Collection c = new ArrayList<Integer>();
+                c.add("1");
+                c.add("2");
+                c.add("3");
+                c.add("4");
+                c.add("5");
+                exchange.getIn().setBody(c);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    // *************************************
+    // Config
+    // *************************************
+
+    @Configuration
+    public class TestConfiguration extends  BaseSqs.TestConfiguration {
+        @Bean
+        public RouteBuilder routeBuilder() {
+            final String sqsEndpointUri = String
+                    
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+                            sharedNameGenerator.getName(),
+                            "1209600", "65536", "60",
+                            
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+            return new RouteBuilder() {
+                @Override
+                public void configure() {
+                    
from("direct:start").startupOrder(2).setHeader(Sqs2Constants.SQS_OPERATION, 
constant("sendBatchMessage"))
+                            .toF("aws2-sqs://%s?autoCreateQueue=true", 
"queue-2001");
+
+                    
fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", "queue-2001")
+                            .startupOrder(1).log("${body}").to("mock:result");
+                }
+            };
+        }
+    }
+}
diff --git 
a/components-starter/camel-aws2-sqs-starter/src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt
 
b/components-starter/camel-aws2-sqs-starter/src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt
new file mode 100644
index 00000000000..135a9337f25
--- /dev/null
+++ 
b/components-starter/camel-aws2-sqs-starter/src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt
@@ -0,0 +1 @@
+{"Version":"2008-10-17","Id":"/195004372649/MyNewCamelQueue/SQSDefaultPolicy","Statement":[{"Sid":"Queue1ReceiveMessage","Effect":"Allow","Principal":{"AWS":"*"},"Action":"SQS:ReceiveMessage","Resource":"/195004372649/MyNewCamelQueue"}]}

Reply via email to