This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 33cc329ebd58bdb569357fde90ff8907d8062e48
Author: PyvesB <[email protected]>
AuthorDate: Thu Dec 7 11:41:54 2017 +0000

    CAMEL-12071 aws-sqs queue creation does not support FIFO queues
---
 .../camel/component/aws/sqs/SqsEndpoint.java       | 30 +++++++------
 .../camel/component/aws/sqs/SqsEndpointTest.java   | 51 +++++++++++++++++++---
 2 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index 8ba00c4..f1b2dc3 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -26,7 +26,6 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
 import com.amazonaws.services.sqs.AmazonSQS;
-import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
 import com.amazonaws.services.sqs.model.CreateQueueRequest;
 import com.amazonaws.services.sqs.model.CreateQueueResult;
@@ -62,7 +61,7 @@ import org.slf4j.LoggerFactory;
 @UriEndpoint(firstVersion = "2.6.0", scheme = "aws-sqs", title = "AWS Simple 
Queue Service", syntax = "aws-sqs:queueNameOrArn",
     consumerClass = SqsConsumer.class, label = "cloud,messaging")
 public class SqsEndpoint extends ScheduledPollEndpoint implements 
HeaderFilterStrategyAware {
-    
+
     private static final Logger LOG = 
LoggerFactory.getLogger(SqsEndpoint.class);
 
     private AmazonSQS client;
@@ -82,7 +81,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
         super(uri, component);
         this.configuration = configuration;
     }
-    
+
     public HeaderFilterStrategy getHeaderFilterStrategy() {
         return headerFilterStrategy;
     }
@@ -93,7 +92,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
     public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) {
         this.headerFilterStrategy = strategy;
     }
-   
+
     public Producer createProducer() throws Exception {
         return new SqsProducer(this);
     }
@@ -116,12 +115,12 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
     protected void doStart() throws Exception {
         client = getConfiguration().getAmazonSQSClient() != null
             ? getConfiguration().getAmazonSQSClient() : getClient();
-            
+
         // Override the endpoint location
         if 
(ObjectHelper.isNotEmpty(getConfiguration().getAmazonSQSEndpoint())) {
             client.setEndpoint(getConfiguration().getAmazonSQSEndpoint());
         }
-        
+
         // check the setting the headerFilterStrategy
         if (headerFilterStrategy == null) {
             headerFilterStrategy = new SqsHeaderFilterStrategy();
@@ -165,6 +164,11 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
 
         // creates a new queue, or returns the URL of an existing one
         CreateQueueRequest request = new 
CreateQueueRequest(configuration.getQueueName());
+        if (getConfiguration().isFifoQueue()) {
+            request.getAttributes().put(QueueAttributeName.FifoQueue.name(), 
String.valueOf(true));
+            boolean useContentBasedDeduplication = 
getConfiguration().getMessageDeduplicationIdStrategy() instanceof 
NullMessageDeduplicationIdStrategy;
+            
request.getAttributes().put(QueueAttributeName.ContentBasedDeduplication.name(),
 String.valueOf(useContentBasedDeduplication));
+        }
         if (getConfiguration().getDefaultVisibilityTimeout() != null) {
             
request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), 
String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
         }
@@ -184,10 +188,10 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
             
request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), 
getConfiguration().getRedrivePolicy());
         }
         LOG.trace("Creating queue [{}] with request [{}]...", 
configuration.getQueueName(), request);
-        
+
         CreateQueueResult queueResult = client.createQueue(request);
         queueUrl = queueResult.getQueueUrl();
-        
+
         LOG.trace("Queue created and available at: {}", queueUrl);
     }
 
@@ -238,10 +242,10 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
         message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle());
         message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes());
         message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, 
msg.getMessageAttributes());
-        
+
         //Need to apply the SqsHeaderFilterStrategy this time
         HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
-        //add all sqs message attributes as camel message headers so that 
knowledge of 
+        //add all sqs message attributes as camel message headers so that 
knowledge of
         //the Sqs class MessageAttributeValue will not leak to the client
         for (Entry<String, MessageAttributeValue> entry : 
msg.getMessageAttributes().entrySet()) {
             String header = entry.getKey();
@@ -260,14 +264,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
     public void setConfiguration(SqsConfiguration configuration) {
         this.configuration = configuration;
     }
-    
+
     public AmazonSQS getClient() {
         if (client == null) {
             client = createClient();
         }
         return client;
     }
-    
+
     public void setClient(AmazonSQS client) {
         this.client = client;
     }
@@ -326,7 +330,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterSt
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
-    
+
     private Object translateValue(MessageAttributeValue mav) {
         Object result = null;
         if (mav.getStringValue() != null) {
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
index 56a5fcb..d7122e1 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
@@ -17,31 +17,36 @@
 package org.apache.camel.component.aws.sqs;
 
 import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
 import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
 import com.amazonaws.services.sqs.model.GetQueueUrlResult;
 import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.QueueAttributeName;
 
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertEquals;
 
 public class SqsEndpointTest {
-    
+
     private SqsEndpoint endpoint;
     private AmazonSQSClient amazonSQSClient;
+    private SqsConfiguration config;
 
     @Before
     public void setUp() throws Exception {
         amazonSQSClient = Mockito.mock(AmazonSQSClient.class);
-        
-        SqsConfiguration config = new SqsConfiguration();
+
+        config = new SqsConfiguration();
         config.setQueueName("test-queue");
         config.setAmazonSQSClient(amazonSQSClient);
-        
+
         endpoint = new SqsEndpoint("aws-sqs://test-queue", new 
SqsComponent(new DefaultCamelContext()), config);
-        
+
     }
 
     @Test
@@ -69,4 +74,40 @@ public class SqsEndpointTest {
         
Mockito.verify(amazonSQSClient).getQueueUrl(expectedGetQueueUrlRequest);
 
     }
+
+    @Test
+    public void 
createQueueShouldCreateFifoQueueWithContentBasedDeduplication() {
+        config.setQueueName("test-queue.fifo");
+        
config.setMessageDeduplicationIdStrategy("useContentBasedDeduplication");
+
+        CreateQueueRequest expectedCreateQueueRequest = new 
CreateQueueRequest("test-queue.fifo")
+                .addAttributesEntry(QueueAttributeName.FifoQueue.name(), 
"true")
+                
.addAttributesEntry(QueueAttributeName.ContentBasedDeduplication.name(), 
"true");
+        Mockito.when(amazonSQSClient.createQueue(expectedCreateQueueRequest))
+                .thenReturn(new CreateQueueResult()
+                                
.withQueueUrl("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo";));
+
+        endpoint.createQueue(amazonSQSClient);
+
+        
Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest);
+        
assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo";, 
endpoint.getQueueUrl());
+    }
+
+    @Test
+    public void 
createQueueShouldCreateFifoQueueWithoutContentBasedDeduplication() {
+        config.setQueueName("test-queue.fifo");
+        config.setMessageDeduplicationIdStrategy("useExchangeId");
+
+        CreateQueueRequest expectedCreateQueueRequest = new 
CreateQueueRequest("test-queue.fifo")
+                .addAttributesEntry(QueueAttributeName.FifoQueue.name(), 
"true")
+                
.addAttributesEntry(QueueAttributeName.ContentBasedDeduplication.name(), 
"false");
+        Mockito.when(amazonSQSClient.createQueue(expectedCreateQueueRequest))
+                .thenReturn(new CreateQueueResult()
+                                
.withQueueUrl("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo";));
+
+        endpoint.createQueue(amazonSQSClient);
+
+        
Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest);
+        
assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo";, 
endpoint.getQueueUrl());
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to