This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 33cc329ebd58bdb569357fde90ff8907d8062e48 Author: PyvesB <[email protected]> AuthorDate: Thu Dec 7 11:41:54 2017 +0000 CAMEL-12071 aws-sqs queue creation does not support FIFO queues --- .../camel/component/aws/sqs/SqsEndpoint.java | 30 +++++++------ .../camel/component/aws/sqs/SqsEndpointTest.java | 51 +++++++++++++++++++--- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index 8ba00c4..f1b2dc3 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -26,7 +26,6 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueResult; @@ -62,7 +61,7 @@ import org.slf4j.LoggerFactory; @UriEndpoint(firstVersion = "2.6.0", scheme = "aws-sqs", title = "AWS Simple Queue Service", syntax = "aws-sqs:queueNameOrArn", consumerClass = SqsConsumer.class, label = "cloud,messaging") public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware { - + private static final Logger LOG = LoggerFactory.getLogger(SqsEndpoint.class); private AmazonSQS client; @@ -82,7 +81,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt super(uri, component); this.configuration = configuration; } - + public HeaderFilterStrategy getHeaderFilterStrategy() { return headerFilterStrategy; } @@ -93,7 +92,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { this.headerFilterStrategy = strategy; } - + public Producer createProducer() throws Exception { return new SqsProducer(this); } @@ -116,12 +115,12 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt protected void doStart() throws Exception { client = getConfiguration().getAmazonSQSClient() != null ? getConfiguration().getAmazonSQSClient() : getClient(); - + // Override the endpoint location if (ObjectHelper.isNotEmpty(getConfiguration().getAmazonSQSEndpoint())) { client.setEndpoint(getConfiguration().getAmazonSQSEndpoint()); } - + // check the setting the headerFilterStrategy if (headerFilterStrategy == null) { headerFilterStrategy = new SqsHeaderFilterStrategy(); @@ -165,6 +164,11 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt // creates a new queue, or returns the URL of an existing one CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); + if (getConfiguration().isFifoQueue()) { + request.getAttributes().put(QueueAttributeName.FifoQueue.name(), String.valueOf(true)); + boolean useContentBasedDeduplication = getConfiguration().getMessageDeduplicationIdStrategy() instanceof NullMessageDeduplicationIdStrategy; + request.getAttributes().put(QueueAttributeName.ContentBasedDeduplication.name(), String.valueOf(useContentBasedDeduplication)); + } if (getConfiguration().getDefaultVisibilityTimeout() != null) { request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); } @@ -184,10 +188,10 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy()); } LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); - + CreateQueueResult queueResult = client.createQueue(request); queueUrl = queueResult.getQueueUrl(); - + LOG.trace("Queue created and available at: {}", queueUrl); } @@ -238,10 +242,10 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt message.setHeader(SqsConstants.RECEIPT_HANDLE, msg.getReceiptHandle()); message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes()); message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes()); - + //Need to apply the SqsHeaderFilterStrategy this time HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy(); - //add all sqs message attributes as camel message headers so that knowledge of + //add all sqs message attributes as camel message headers so that knowledge of //the Sqs class MessageAttributeValue will not leak to the client for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) { String header = entry.getKey(); @@ -260,14 +264,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt public void setConfiguration(SqsConfiguration configuration) { this.configuration = configuration; } - + public AmazonSQS getClient() { if (client == null) { client = createClient(); } return client; } - + public void setClient(AmazonSQS client) { this.client = client; } @@ -326,7 +330,7 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { this.maxMessagesPerPoll = maxMessagesPerPoll; } - + private Object translateValue(MessageAttributeValue mav) { Object result = null; if (mav.getStringValue() != null) { diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java index 56a5fcb..d7122e1 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java @@ -17,31 +17,36 @@ package org.apache.camel.component.aws.sqs; import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.CreateQueueResult; import com.amazonaws.services.sqs.model.GetQueueUrlRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; import com.amazonaws.services.sqs.model.ListQueuesResult; +import com.amazonaws.services.sqs.model.QueueAttributeName; import org.apache.camel.impl.DefaultCamelContext; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import static org.junit.Assert.assertEquals; public class SqsEndpointTest { - + private SqsEndpoint endpoint; private AmazonSQSClient amazonSQSClient; + private SqsConfiguration config; @Before public void setUp() throws Exception { amazonSQSClient = Mockito.mock(AmazonSQSClient.class); - - SqsConfiguration config = new SqsConfiguration(); + + config = new SqsConfiguration(); config.setQueueName("test-queue"); config.setAmazonSQSClient(amazonSQSClient); - + endpoint = new SqsEndpoint("aws-sqs://test-queue", new SqsComponent(new DefaultCamelContext()), config); - + } @Test @@ -69,4 +74,40 @@ public class SqsEndpointTest { Mockito.verify(amazonSQSClient).getQueueUrl(expectedGetQueueUrlRequest); } + + @Test + public void createQueueShouldCreateFifoQueueWithContentBasedDeduplication() { + config.setQueueName("test-queue.fifo"); + config.setMessageDeduplicationIdStrategy("useContentBasedDeduplication"); + + CreateQueueRequest expectedCreateQueueRequest = new CreateQueueRequest("test-queue.fifo") + .addAttributesEntry(QueueAttributeName.FifoQueue.name(), "true") + .addAttributesEntry(QueueAttributeName.ContentBasedDeduplication.name(), "true"); + Mockito.when(amazonSQSClient.createQueue(expectedCreateQueueRequest)) + .thenReturn(new CreateQueueResult() + .withQueueUrl("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo")); + + endpoint.createQueue(amazonSQSClient); + + Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest); + assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo", endpoint.getQueueUrl()); + } + + @Test + public void createQueueShouldCreateFifoQueueWithoutContentBasedDeduplication() { + config.setQueueName("test-queue.fifo"); + config.setMessageDeduplicationIdStrategy("useExchangeId"); + + CreateQueueRequest expectedCreateQueueRequest = new CreateQueueRequest("test-queue.fifo") + .addAttributesEntry(QueueAttributeName.FifoQueue.name(), "true") + .addAttributesEntry(QueueAttributeName.ContentBasedDeduplication.name(), "false"); + Mockito.when(amazonSQSClient.createQueue(expectedCreateQueueRequest)) + .thenReturn(new CreateQueueResult() + .withQueueUrl("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo")); + + endpoint.createQueue(amazonSQSClient); + + Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest); + assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue.fifo", endpoint.getQueueUrl()); + } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
