[
https://issues.apache.org/jira/browse/CAMEL-11224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221876#comment-16221876
]
ASF GitHub Bot commented on CAMEL-11224:
----------------------------------------
davsclaus closed pull request #2018: CAMEL-11224 aws-sqs producer does not
support new FIFO queues
URL: https://github.com/apache/camel/pull/2018
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
new file mode 100644
index 00000000000..be5be47b2fd
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class ConstantMessageGroupIdStrategy implements MessageGroupIdStrategy {
+
+ @Override
+ public String getMessageGroupId(Exchange exchange) {
+ return "CamelSingleMessageGroup";
+ }
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdMessageDeduplicationIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdMessageDeduplicationIdStrategy.java
new file mode 100644
index 00000000000..6a50ffc5967
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdMessageDeduplicationIdStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class ExchangeIdMessageDeduplicationIdStrategy implements
MessageDeduplicationIdStrategy {
+
+ @Override
+ public String getMessageDeduplicationId(Exchange exchange) {
+ return exchange.getExchangeId();
+ }
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdMessageGroupIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdMessageGroupIdStrategy.java
new file mode 100644
index 00000000000..af38e10b74f
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdMessageGroupIdStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class ExchangeIdMessageGroupIdStrategy implements
MessageGroupIdStrategy {
+
+ @Override
+ public String getMessageGroupId(Exchange exchange) {
+ return exchange.getExchangeId();
+ }
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/MessageDeduplicationIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/MessageDeduplicationIdStrategy.java
new file mode 100644
index 00000000000..7f79ddf4526
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/MessageDeduplicationIdStrategy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public interface MessageDeduplicationIdStrategy {
+
+ String getMessageDeduplicationId(Exchange exchange);
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/MessageGroupIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/MessageGroupIdStrategy.java
new file mode 100644
index 00000000000..23cb325f8fe
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/MessageGroupIdStrategy.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public interface MessageGroupIdStrategy {
+
+ String getMessageGroupId(Exchange exchange);
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullMessageDeduplicationIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullMessageDeduplicationIdStrategy.java
new file mode 100644
index 00000000000..6ab4ad475c5
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullMessageDeduplicationIdStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class NullMessageDeduplicationIdStrategy implements
MessageDeduplicationIdStrategy {
+
+ @Override
+ public String getMessageDeduplicationId(Exchange exchange) {
+ return null;
+ }
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/PropertyValueMessageGroupIdStrategy.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/PropertyValueMessageGroupIdStrategy.java
new file mode 100644
index 00000000000..5a51ff753d4
--- /dev/null
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/PropertyValueMessageGroupIdStrategy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import org.apache.camel.Exchange;
+
+public class PropertyValueMessageGroupIdStrategy implements
MessageGroupIdStrategy {
+
+ @Override
+ public String getMessageGroupId(Exchange exchange) {
+ return exchange.getProperty(SqsConstants.MESSAGE_GROUP_ID_PROPERTY,
String.class);
+ }
+
+}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
index f8656701303..75fabdcc4ec 100644
---
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java
@@ -21,6 +21,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.ObjectHelper;
public class SqsComponent extends UriEndpointComponent {
@@ -62,6 +63,10 @@ protected Endpoint createEndpoint(String uri, String
remaining, Map<String, Obje
throw new IllegalArgumentException("Extending message visibility
(extendMessageVisibility) requires visibilityTimeout to be set on the
Endpoint.");
}
+ if (configuration.isFifoQueue() &&
ObjectHelper.isEmpty(configuration.getMessageGroupIdStrategy())) {
+ throw new IllegalArgumentException("messageGroupIdStrategy must be
set for FIFO queues.");
+ }
+
SqsEndpoint sqsEndpoint = new SqsEndpoint(uri, this, configuration);
sqsEndpoint.setConsumerProperties(parameters);
return sqsEndpoint;
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index 3683e4474d7..24816e393f2 100644
---
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -67,6 +67,10 @@
// producer properties
@UriParam(label = "producer")
private Integer delaySeconds;
+ @UriParam(label = "producer")
+ private MessageGroupIdStrategy messageGroupIdStrategy;
+ @UriParam(label = "producer", defaultValue = "useExchangeId")
+ private MessageDeduplicationIdStrategy messageDeduplicationIdStrategy =
new ExchangeIdMessageDeduplicationIdStrategy();
// queue properties
@UriParam(label = "queue")
@@ -83,6 +87,18 @@
private String redrivePolicy;
/**
+ * Whether or not the queue is a FIFO queue
+ */
+ boolean isFifoQueue() {
+ // AWS docs suggest this is valid derivation.
+ // FIFO queue names must end with .fifo, and standard queues cannot
+ if (queueName.endsWith(".fifo")) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
* The region with which the AWS-SQS client wants to work with.
* Only works if Camel creates the AWS-SQS client, i.e., if you explicitly
set amazonSQSClient,
* then this setting will have no effect. You would have to set it on the
client you create directly
@@ -364,4 +380,44 @@ public Integer getProxyPort() {
public void setProxyPort(Integer proxyPort) {
this.proxyPort = proxyPort;
}
+
+ /**
+ * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the
messageGroupId on the message.
+ * Can be one of the following options: *useConstant*, *useExchangeId*,
*usePropertyValue*.
+ * For the *usePropertyValue* option, the value of property
"CamelAwsMessageGroupId" will be used.
+ */
+ public void setMessageGroupIdStrategy(String strategy) {
+ if ("useConstant".equalsIgnoreCase(strategy)) {
+ messageGroupIdStrategy = new ConstantMessageGroupIdStrategy();
+ } else if ("useExchangeId".equalsIgnoreCase(strategy)) {
+ messageGroupIdStrategy = new ExchangeIdMessageGroupIdStrategy();
+ } else if ("usePropertyValue".equalsIgnoreCase(strategy)) {
+ messageGroupIdStrategy = new PropertyValueMessageGroupIdStrategy();
+ } else {
+ throw new IllegalArgumentException("Unrecognised
MessageGroupIdStrategy: " + strategy);
+ }
+ }
+
+ public MessageGroupIdStrategy getMessageGroupIdStrategy() {
+ return messageGroupIdStrategy;
+ }
+
+ public MessageDeduplicationIdStrategy getMessageDeduplicationIdStrategy() {
+ return messageDeduplicationIdStrategy;
+ }
+
+ /**
+ * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the
messageDeduplicationId on the message.
+ * Can be one of the following options: *useExchangeId*,
*useContentBasedDeduplication*.
+ * For the *useContentBasedDeduplication* option, no
messageDeduplicationId will be set on the message.
+ */
+ public void setMessageDeduplicationIdStrategy(String strategy) {
+ if ("useExchangeId".equalsIgnoreCase(strategy)) {
+ messageDeduplicationIdStrategy = new
ExchangeIdMessageDeduplicationIdStrategy();
+ } else if ("useContentBasedDeduplication".equalsIgnoreCase(strategy)) {
+ messageDeduplicationIdStrategy = new
NullMessageDeduplicationIdStrategy();
+ } else {
+ throw new IllegalArgumentException("Unrecognised
MessageDeduplicationIdStrategy: " + strategy);
+ }
+ }
}
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
index 61891c92b9c..3c2dec10806 100644
---
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java
@@ -28,4 +28,5 @@
String MESSAGE_ID = "CamelAwsSqsMessageId";
String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle";
String DELAY_HEADER = "CamelAwsSqsDelaySeconds";
+ String MESSAGE_GROUP_ID_PROPERTY = "CamelAwsMessageGroupId";
}
\ No newline at end of file
diff --git
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
index 682d75e4f4c..dfc22790c1f 100644
---
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
+++
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java
@@ -42,9 +42,9 @@
*
*/
public class SqsProducer extends DefaultProducer {
-
+
private static final Logger LOG =
LoggerFactory.getLogger(SqsProducer.class);
-
+
private transient String sqsProducerToString;
public SqsProducer(SqsEndpoint endpoint) throws
NoFactoryAvailableException {
@@ -56,18 +56,33 @@ public void process(Exchange exchange) throws Exception {
SendMessageRequest request = new SendMessageRequest(getQueueUrl(),
body);
request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(),
exchange));
addDelay(request, exchange);
+ configureFifoAttributes(request, exchange);
LOG.trace("Sending request [{}] from exchange [{}]...", request,
exchange);
-
+
SendMessageResult result = getClient().sendMessage(request);
-
+
LOG.trace("Received result [{}]", result);
-
+
Message message = getMessageForResponse(exchange);
message.setHeader(SqsConstants.MESSAGE_ID, result.getMessageId());
message.setHeader(SqsConstants.MD5_OF_BODY,
result.getMD5OfMessageBody());
}
+ private void configureFifoAttributes(SendMessageRequest request, Exchange
exchange) {
+ if (getEndpoint().getConfiguration().isFifoQueue()) {
+ // use strategies
+ MessageGroupIdStrategy messageGroupIdStrategy =
getEndpoint().getConfiguration().getMessageGroupIdStrategy();
+ String messageGroupId =
messageGroupIdStrategy.getMessageGroupId(exchange);
+ request.setMessageGroupId(messageGroupId);
+
+ MessageDeduplicationIdStrategy messageDeduplicationIdStrategy =
getEndpoint().getConfiguration().getMessageDeduplicationIdStrategy();
+ String messageDeduplicationId =
messageDeduplicationIdStrategy.getMessageDeduplicationId(exchange);
+ request.setMessageDeduplicationId(messageDeduplicationId);
+
+ }
+ }
+
private void addDelay(SendMessageRequest request, Exchange exchange) {
Integer headerValue =
exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class);
Integer delayValue;
@@ -81,20 +96,20 @@ private void addDelay(SendMessageRequest request, Exchange
exchange) {
LOG.trace("found delay: " + delayValue);
request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) :
delayValue);
}
-
+
protected AmazonSQS getClient() {
return getEndpoint().getClient();
}
-
+
protected String getQueueUrl() {
return getEndpoint().getQueueUrl();
}
-
+
@Override
public SqsEndpoint getEndpoint() {
return (SqsEndpoint) super.getEndpoint();
}
-
+
@Override
public String toString() {
if (sqsProducerToString == null) {
@@ -102,7 +117,7 @@ public String toString() {
}
return sqsProducerToString;
}
-
+
private Map<String, MessageAttributeValue> translateAttributes(Map<String,
Object> headers, Exchange exchange) {
Map<String, MessageAttributeValue> result = new HashMap<String,
MessageAttributeValue>();
HeaderFilterStrategy headerFilterStrategy =
getEndpoint().getHeaderFilterStrategy();
diff --git
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
new file mode 100644
index 00000000000..ace51733971
--- /dev/null
+++
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java
@@ -0,0 +1,32 @@
+package org.apache.camel.component.aws.sqs;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class SqsConfigurationTest {
+
+ @Test
+ public void itReturnsAnInformativeErrorForBadMessageGroupIdStrategy()
throws Exception {
+ SqsConfiguration sqsConfiguration = new SqsConfiguration();
+ try {
+ sqsConfiguration.setMessageGroupIdStrategy("useUnknownStrategy");
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ assertTrue("Bad error message: " + e.getMessage(),
e.getMessage().startsWith("Unrecognised MessageGroupIdStrategy"));
+ }
+ }
+
+
+ @Test
+ public void
itReturnsAnInformativeErrorForBadMessageDeduplicationIdStrategy() throws
Exception {
+ SqsConfiguration sqsConfiguration = new SqsConfiguration();
+ try {
+
sqsConfiguration.setMessageDeduplicationIdStrategy("useUnknownStrategy");
+ fail("Should have thrown exception");
+ } catch (Exception e) {
+ assertTrue("Bad error message: " + e.getMessage(),
e.getMessage().startsWith("Unrecognised MessageDeduplicationIdStrategy"));
+ }
+ }
+
+}
diff --git
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
index bed350ce6c3..e1e511d5ff0 100644
---
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
+++
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java
@@ -55,6 +55,7 @@
private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 =
"heder_value_3";
private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1";
private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue";
+ private static final String SAMPLE_EXCHANGE_ID =
"ID:whatever-the-hostname-is-32818-1506943497897-1:1:8:1:75939";
Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS);
@@ -75,12 +76,14 @@ public void setup() throws Exception {
sqsConfiguration = new SqsConfiguration();
HeaderFilterStrategy headerFilterStrategy = new
SqsHeaderFilterStrategy();
sqsConfiguration.setDelaySeconds(Integer.valueOf(0));
+ sqsConfiguration.setQueueName("queueName");
when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient);
when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration);
when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult);
when(exchange.getOut()).thenReturn(outMessage);
when(exchange.getIn()).thenReturn(inMessage);
when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly);
+ when(exchange.getExchangeId()).thenReturn(SAMPLE_EXCHANGE_ID);
when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY);
when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL);
when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy);
@@ -194,8 +197,92 @@ public void isAllAttributeMessagesOnTheRequest() throws
Exception {
.getStringValue());
assertEquals(3, capture.getValue().getMessageAttributes().size());
}
+
+ @Test
+ public void itSetsMessageGroupIdUsingConstantStrategy() throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture =
ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals("CamelSingleMessageGroup",
capture.getValue().getMessageGroupId());
+
+ }
+ @Test
+ public void itSetsMessageGroupIdUsingExchangeIdStrategy() throws Exception
{
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useExchangeId");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture =
ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_EXCHANGE_ID,
capture.getValue().getMessageGroupId());
+
+ }
+ @Test
+ public void itSetsMessageGroupIdUsingHeaderValueStrategy() throws
Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("usePropertyValue");
+ when(exchange.getProperty(SqsConstants.MESSAGE_GROUP_ID_PROPERTY,
String.class)).thenReturn("my-group-id");
+
+ underTest.process(exchange);
+ ArgumentCaptor<SendMessageRequest> capture =
ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals("my-group-id", capture.getValue().getMessageGroupId());
+
+ }
+
+ @Test
+ public void itSetsMessageDedpulicationIdUsingExchangeIdStrategy() throws
Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+ sqsConfiguration.setMessageDeduplicationIdStrategy("useExchangeId");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture =
ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_EXCHANGE_ID,
capture.getValue().getMessageDeduplicationId());
+
+ }
+
+ @Test
+ public void itSetsMessageDedpulicationIdUsingExchangeIdStrategyAsDefault()
throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture =
ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertEquals(SAMPLE_EXCHANGE_ID,
capture.getValue().getMessageDeduplicationId());
+
+ }
+
+ @Test
+ public void
itDoesNotSetMessageDedpulicationIdUsingContentBasedDeduplicationStrategy()
throws Exception {
+ sqsConfiguration.setQueueName("queueName.fifo");
+ sqsConfiguration.setMessageGroupIdStrategy("useConstant");
+
sqsConfiguration.setMessageDeduplicationIdStrategy("useContentBasedDeduplication");
+
+ underTest.process(exchange);
+
+ ArgumentCaptor<SendMessageRequest> capture =
ArgumentCaptor.forClass(SendMessageRequest.class);
+ verify(amazonSQSClient).sendMessage(capture.capture());
+
+ assertNull(capture.getValue().getMessageDeduplicationId());
+
+ }
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> aws-sqs producer does not support new FIFO queues
> -------------------------------------------------
>
> Key: CAMEL-11224
> URL: https://issues.apache.org/jira/browse/CAMEL-11224
> Project: Camel
> Issue Type: Improvement
> Components: camel-aws
> Reporter: Emilio Corengia
> Fix For: 2.21.0
>
>
> At the moment *aws-sqs producer* does not support the new _FIFO_ queue
> mechanism from AWS ([FIFO-queues
> ref|http://http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html]).
> There is a new parameter, +MessageGroupId+, that is required for FIFO queues
> (it is not available on Standard queues).
> The message group ID is the tag that specifies that a message belongs to a
> specific message group. Messages that belong to the same message group are
> always processed one by one, in a strict order relative to the message group
> (however, messages that belong to different message groups might be processed
> out of order). +Source+:
> http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queue-recommendations.html#using-messagegroupid-property.
> This improvement consists on adding support for MessageGroupId on the
> producer side of aws-sqs. It should be applicable on FIFO queues only. It
> could be customizable via URI options. A default strategy should be provided.
> This default strategy will generate a unique message group ID for each
> message to favor throughput and latency over ordering of messages.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)