This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit b139e822bcb79cf51ca5a3b9add5e6ef8e3a313d Author: Jon McEwen <[email protected]> AuthorDate: Tue Oct 10 10:36:18 2017 +0100 CAMEL-11224 aws-sqs producer does not support new FIFO queues --- ...ts.java => ConstantMessageGroupIdStrategy.java} | 21 +++--- .../{SqsConstants.java => ExchangeIdStrategy.java} | 21 +++--- ...{SqsConstants.java => HeaderValueStrategy.java} | 21 +++--- .../camel/component/aws/sqs/NullStrategy.java | 12 +++ .../camel/component/aws/sqs/SqsComponent.java | 4 + .../camel/component/aws/sqs/SqsConfiguration.java | 56 ++++++++++++++ .../camel/component/aws/sqs/SqsConstants.java | 1 + .../camel/component/aws/sqs/SqsProducer.java | 35 ++++++--- ...s.java => StringValueFromExchangeStrategy.java} | 19 ++--- .../component/aws/sqs/SqsConfigurationTest.java | 32 ++++++++ .../camel/component/aws/sqs/SqsProducerTest.java | 87 ++++++++++++++++++++++ 11 files changed, 251 insertions(+), 58 deletions(-) diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java similarity index 67% copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java index 61891c9..44caf36 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ConstantMessageGroupIdStrategy.java @@ -16,16 +16,13 @@ */ package org.apache.camel.component.aws.sqs; -/** - * Constants used in Camel AWS SQS module - * - */ -public interface SqsConstants { +import org.apache.camel.Exchange; + +public class ConstantMessageGroupIdStrategy implements StringValueFromExchangeStrategy { + + @Override + public String value(Exchange exchange) { + return "CamelSingleMessageGroup"; + } - String ATTRIBUTES = "CamelAwsSqsAttributes"; - String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes"; - String MD5_OF_BODY = "CamelAwsSqsMD5OfBody"; - String MESSAGE_ID = "CamelAwsSqsMessageId"; - String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle"; - String DELAY_HEADER = "CamelAwsSqsDelaySeconds"; -} \ No newline at end of file +} diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java similarity index 67% copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java index 61891c9..6b0dac1 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/ExchangeIdStrategy.java @@ -16,16 +16,13 @@ */ package org.apache.camel.component.aws.sqs; -/** - * Constants used in Camel AWS SQS module - * - */ -public interface SqsConstants { +import org.apache.camel.Exchange; + +public class ExchangeIdStrategy implements StringValueFromExchangeStrategy { + + @Override + public String value(Exchange exchange) { + return exchange.getExchangeId(); + } - String ATTRIBUTES = "CamelAwsSqsAttributes"; - String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes"; - String MD5_OF_BODY = "CamelAwsSqsMD5OfBody"; - String MESSAGE_ID = "CamelAwsSqsMessageId"; - String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle"; - String DELAY_HEADER = "CamelAwsSqsDelaySeconds"; -} \ No newline at end of file +} diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java similarity index 67% copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java index 61891c9..0bef8c2 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/HeaderValueStrategy.java @@ -16,16 +16,13 @@ */ package org.apache.camel.component.aws.sqs; -/** - * Constants used in Camel AWS SQS module - * - */ -public interface SqsConstants { +import org.apache.camel.Exchange; + +public class HeaderValueStrategy implements StringValueFromExchangeStrategy { + + @Override + public String value(Exchange exchange) { + return exchange.getIn().getHeader(SqsConstants.MESSAGE_GROUP_ID_HEADER, String.class); + } - String ATTRIBUTES = "CamelAwsSqsAttributes"; - String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes"; - String MD5_OF_BODY = "CamelAwsSqsMD5OfBody"; - String MESSAGE_ID = "CamelAwsSqsMessageId"; - String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle"; - String DELAY_HEADER = "CamelAwsSqsDelaySeconds"; -} \ No newline at end of file +} diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java new file mode 100644 index 0000000..bbb5ab3 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/NullStrategy.java @@ -0,0 +1,12 @@ +package org.apache.camel.component.aws.sqs; + +import org.apache.camel.Exchange; + +public class NullStrategy implements StringValueFromExchangeStrategy { + + @Override + public String value(Exchange exchange) { + return null; + } + +} diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java index f865670..653b956 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsComponent.java @@ -62,6 +62,10 @@ public class SqsComponent extends UriEndpointComponent { throw new IllegalArgumentException("Extending message visibility (extendMessageVisibility) requires visibilityTimeout to be set on the Endpoint."); } + if (configuration.isFifoQueue() && configuration.getMessageGroupIdStrategy() == null) { + throw new IllegalArgumentException("messageGroupIdStrategy must be set for FIFO queues."); + } + SqsEndpoint sqsEndpoint = new SqsEndpoint(uri, this, configuration); sqsEndpoint.setConsumerProperties(parameters); return sqsEndpoint; diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java index 3683e44..dbab657f 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java @@ -67,6 +67,10 @@ public class SqsConfiguration { // producer properties @UriParam(label = "producer") private Integer delaySeconds; + @UriParam(label = "producer") + private StringValueFromExchangeStrategy messageGroupIdStrategy; + @UriParam(label = "producer", defaultValue = "useExchangeId") + private StringValueFromExchangeStrategy messageDeduplicationIdStrategy = new ExchangeIdStrategy(); // queue properties @UriParam(label = "queue") @@ -83,6 +87,18 @@ public class SqsConfiguration { private String redrivePolicy; /** + * Whether or not the queue is a FIFO queue + */ + public boolean isFifoQueue() { + // AWS docs suggest this is valid derivation. + // FIFO queue names must end with .fifo, and standard queues cannot + if (queueName.endsWith(".fifo")) { + return true; + } + return false; + } + + /** * The region with which the AWS-SQS client wants to work with. * Only works if Camel creates the AWS-SQS client, i.e., if you explicitly set amazonSQSClient, * then this setting will have no effect. You would have to set it on the client you create directly @@ -364,4 +380,44 @@ public class SqsConfiguration { public void setProxyPort(Integer proxyPort) { this.proxyPort = proxyPort; } + + /** + * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the messageGroupId on the message. + * Can be one of the following options: *useConstant*, *useExchangeId*, *useHeaderValue*. + * For the *useHeaderValue* option, the value of header "CamelAwsMessageGroupId" will be used. + */ + public void setMessageGroupIdStrategy(String strategy) { + if ("useConstant".equalsIgnoreCase(strategy)) { + messageGroupIdStrategy = new ConstantMessageGroupIdStrategy(); + } else if ("useExchangeId".equalsIgnoreCase(strategy)) { + messageGroupIdStrategy = new ExchangeIdStrategy(); + } else if ("useHeaderValue".equalsIgnoreCase(strategy)) { + messageGroupIdStrategy = new HeaderValueStrategy(); + } else { + throw new IllegalArgumentException("Unrecognised MessageGroupIdStrategy: " + strategy); + } + } + + public StringValueFromExchangeStrategy getMessageGroupIdStrategy() { + return messageGroupIdStrategy; + } + + public StringValueFromExchangeStrategy getMessageDeduplicationIdStrategy() { + return messageDeduplicationIdStrategy; + } + + /** + * Since *Camel 2.20*. Only for FIFO queues. Strategy for setting the messageDeduplicationId on the message. + * Can be one of the following options: *useExchangeId*, *useContentBasedDeduplication*. + * For the *useContentBasedDeduplication* option, no messageDeduplicationId will be set on the message. + */ + public void setMessageDeduplicationIdStrategy(String strategy) { + if ("useExchangeId".equalsIgnoreCase(strategy)) { + messageDeduplicationIdStrategy = new ExchangeIdStrategy(); + } else if ("useContentBasedDeduplication".equalsIgnoreCase(strategy)) { + messageDeduplicationIdStrategy = new NullStrategy(); + } else { + throw new IllegalArgumentException("Unrecognised MessageDeduplicationIdStrategy: " + strategy); + } + } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java index 61891c9..5fc53c6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java @@ -28,4 +28,5 @@ public interface SqsConstants { String MESSAGE_ID = "CamelAwsSqsMessageId"; String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle"; String DELAY_HEADER = "CamelAwsSqsDelaySeconds"; + String MESSAGE_GROUP_ID_HEADER = "CamelAwsMessageGroupId"; } \ No newline at end of file diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java index 682d75e..e3d2f00 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java @@ -42,9 +42,9 @@ import static org.apache.camel.component.aws.common.AwsExchangeUtil.getMessageFo * */ public class SqsProducer extends DefaultProducer { - + private static final Logger LOG = LoggerFactory.getLogger(SqsProducer.class); - + private transient String sqsProducerToString; public SqsProducer(SqsEndpoint endpoint) throws NoFactoryAvailableException { @@ -56,18 +56,33 @@ public class SqsProducer extends DefaultProducer { SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body); request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange)); addDelay(request, exchange); + configureFifoAttributes(request, exchange); LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange); - + SendMessageResult result = getClient().sendMessage(request); - + LOG.trace("Received result [{}]", result); - + Message message = getMessageForResponse(exchange); message.setHeader(SqsConstants.MESSAGE_ID, result.getMessageId()); message.setHeader(SqsConstants.MD5_OF_BODY, result.getMD5OfMessageBody()); } + private void configureFifoAttributes(SendMessageRequest request, Exchange exchange) { + if (getEndpoint().getConfiguration().isFifoQueue()) { + // use strategies + StringValueFromExchangeStrategy messageGroupIdStrategy = getEndpoint().getConfiguration().getMessageGroupIdStrategy(); + String messageGroupId = messageGroupIdStrategy.value(exchange); + request.setMessageGroupId(messageGroupId); + + StringValueFromExchangeStrategy messageDeduplicationIdStrategy = getEndpoint().getConfiguration().getMessageDeduplicationIdStrategy(); + String messageDeduplicationId = messageDeduplicationIdStrategy.value(exchange); + request.setMessageDeduplicationId(messageDeduplicationId); + + } + } + private void addDelay(SendMessageRequest request, Exchange exchange) { Integer headerValue = exchange.getIn().getHeader(SqsConstants.DELAY_HEADER, Integer.class); Integer delayValue; @@ -81,20 +96,20 @@ public class SqsProducer extends DefaultProducer { LOG.trace("found delay: " + delayValue); request.setDelaySeconds(delayValue == null ? Integer.valueOf(0) : delayValue); } - + protected AmazonSQS getClient() { return getEndpoint().getClient(); } - + protected String getQueueUrl() { return getEndpoint().getQueueUrl(); } - + @Override public SqsEndpoint getEndpoint() { return (SqsEndpoint) super.getEndpoint(); } - + @Override public String toString() { if (sqsProducerToString == null) { @@ -102,7 +117,7 @@ public class SqsProducer extends DefaultProducer { } return sqsProducerToString; } - + private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) { Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>(); HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy(); diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java similarity index 67% copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java index 61891c9..31eafb6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/StringValueFromExchangeStrategy.java @@ -16,16 +16,11 @@ */ package org.apache.camel.component.aws.sqs; -/** - * Constants used in Camel AWS SQS module - * - */ -public interface SqsConstants { +import org.apache.camel.Exchange; + +@FunctionalInterface +public interface StringValueFromExchangeStrategy { + + String value(Exchange exchange); - String ATTRIBUTES = "CamelAwsSqsAttributes"; - String MESSAGE_ATTRIBUTES = "CamelAwsSqsMessageAttributes"; - String MD5_OF_BODY = "CamelAwsSqsMD5OfBody"; - String MESSAGE_ID = "CamelAwsSqsMessageId"; - String RECEIPT_HANDLE = "CamelAwsSqsReceiptHandle"; - String DELAY_HEADER = "CamelAwsSqsDelaySeconds"; -} \ No newline at end of file +} diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java new file mode 100644 index 0000000..ace5173 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsConfigurationTest.java @@ -0,0 +1,32 @@ +package org.apache.camel.component.aws.sqs; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class SqsConfigurationTest { + + @Test + public void itReturnsAnInformativeErrorForBadMessageGroupIdStrategy() throws Exception { + SqsConfiguration sqsConfiguration = new SqsConfiguration(); + try { + sqsConfiguration.setMessageGroupIdStrategy("useUnknownStrategy"); + fail("Should have thrown exception"); + } catch (Exception e) { + assertTrue("Bad error message: " + e.getMessage(), e.getMessage().startsWith("Unrecognised MessageGroupIdStrategy")); + } + } + + + @Test + public void itReturnsAnInformativeErrorForBadMessageDeduplicationIdStrategy() throws Exception { + SqsConfiguration sqsConfiguration = new SqsConfiguration(); + try { + sqsConfiguration.setMessageDeduplicationIdStrategy("useUnknownStrategy"); + fail("Should have thrown exception"); + } catch (Exception e) { + assertTrue("Bad error message: " + e.getMessage(), e.getMessage().startsWith("Unrecognised MessageDeduplicationIdStrategy")); + } + } + +} diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java index bed350c..c1d24f1 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java @@ -55,6 +55,7 @@ public class SqsProducerTest { private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3"; private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1"; private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue"; + private static final String SAMPLE_EXCHANGE_ID = "ID:whatever-the-hostname-is-32818-1506943497897-1:1:8:1:75939"; Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS); @@ -75,12 +76,14 @@ public class SqsProducerTest { sqsConfiguration = new SqsConfiguration(); HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy(); sqsConfiguration.setDelaySeconds(Integer.valueOf(0)); + sqsConfiguration.setQueueName("queueName"); when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient); when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration); when(amazonSQSClient.sendMessage(any(SendMessageRequest.class))).thenReturn(sendMessageResult); when(exchange.getOut()).thenReturn(outMessage); when(exchange.getIn()).thenReturn(inMessage); when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly); + when(exchange.getExchangeId()).thenReturn(SAMPLE_EXCHANGE_ID); when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY); when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL); when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy); @@ -194,8 +197,92 @@ public class SqsProducerTest { .getStringValue()); assertEquals(3, capture.getValue().getMessageAttributes().size()); } + + @Test + public void itSetsMessageGroupIdUsingConstantStrategy() throws Exception { + sqsConfiguration.setQueueName("queueName.fifo"); + sqsConfiguration.setMessageGroupIdStrategy("useConstant"); + + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals("CamelSingleMessageGroup", capture.getValue().getMessageGroupId()); + + } + @Test + public void itSetsMessageGroupIdUsingExchangeIdStrategy() throws Exception { + sqsConfiguration.setQueueName("queueName.fifo"); + sqsConfiguration.setMessageGroupIdStrategy("useExchangeId"); + + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageGroupId()); + + } + @Test + public void itSetsMessageGroupIdUsingHeaderValueStrategy() throws Exception { + sqsConfiguration.setQueueName("queueName.fifo"); + sqsConfiguration.setMessageGroupIdStrategy("useHeaderValue"); + when(inMessage.getHeader(SqsConstants.MESSAGE_GROUP_ID_HEADER, String.class)).thenReturn("my-group-id"); + + underTest.process(exchange); + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals("my-group-id", capture.getValue().getMessageGroupId()); + + } + + @Test + public void itSetsMessageDedpulicationIdUsingExchangeIdStrategy() throws Exception { + sqsConfiguration.setQueueName("queueName.fifo"); + sqsConfiguration.setMessageGroupIdStrategy("useConstant"); + sqsConfiguration.setMessageDeduplicationIdStrategy("useExchangeId"); + + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageDeduplicationId()); + + } + + @Test + public void itSetsMessageDedpulicationIdUsingExchangeIdStrategyAsDefault() throws Exception { + sqsConfiguration.setQueueName("queueName.fifo"); + sqsConfiguration.setMessageGroupIdStrategy("useConstant"); + + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertEquals(SAMPLE_EXCHANGE_ID, capture.getValue().getMessageDeduplicationId()); + + } + + @Test + public void itDoesNotSetMessageDedpulicationIdUsingContentBasedDeduplicationStrategy() throws Exception { + sqsConfiguration.setQueueName("queueName.fifo"); + sqsConfiguration.setMessageGroupIdStrategy("useConstant"); + sqsConfiguration.setMessageDeduplicationIdStrategy("useContentBasedDeduplication"); + + underTest.process(exchange); + + ArgumentCaptor<SendMessageRequest> capture = ArgumentCaptor.forClass(SendMessageRequest.class); + verify(amazonSQSClient).sendMessage(capture.capture()); + + assertNull(capture.getValue().getMessageDeduplicationId()); + + } } \ No newline at end of file -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
