Repository: nifi
Updated Branches:
  refs/heads/master c3059939e -> 10254a03c


NIFI-4015 NIFI-3999 Fix DeleteSQS Issues

* Avoid exception by providing id in DeleteMessageBatchRequestEntry
* Include receipt handle property descriptor for user configuration

Signed-off-by: Pierre Villard <[email protected]>

This closes #1888.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/10254a03
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/10254a03
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/10254a03

Branch: refs/heads/master
Commit: 10254a03c2127c58d8efc2fecfba3c4e2147143d
Parents: c305993
Author: James Wing <[email protected]>
Authored: Sat Jun 3 12:49:08 2017 -0700
Committer: Pierre Villard <[email protected]>
Committed: Wed May 16 18:15:20 2018 +0200

----------------------------------------------------------------------
 .../nifi/processors/aws/sqs/DeleteSQS.java      |  9 ++-
 .../nifi/processors/aws/sqs/ITDeleteSQS.java    | 83 ++++++++++++++++++++
 .../nifi/processors/aws/sqs/TestDeleteSQS.java  | 28 +++++--
 3 files changed, 110 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/10254a03/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 5beead5..e2cef50 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -55,7 +56,8 @@ public class DeleteSQS extends AbstractSQSProcessor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, PROXY_HOST, 
PROXY_HOST_PORT));
+            Arrays.asList(QUEUE_URL, RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE,
+                    REGION, TIMEOUT, PROXY_HOST, PROXY_HOST_PORT));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -80,7 +82,10 @@ public class DeleteSQS extends AbstractSQSProcessor {
 
         for (final FlowFile flowFile : flowFiles) {
             final DeleteMessageBatchRequestEntry entry = new 
DeleteMessageBatchRequestEntry();
-            
entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue());
+            String receiptHandle = 
context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue();
+            entry.setReceiptHandle(receiptHandle);
+            String entryId = flowFile.getAttribute(CoreAttributes.UUID.key());
+            entry.setId(entryId);
             entries.add(entry);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/10254a03/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
new file mode 100644
index 0000000..04fa676
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/ITDeleteSQS.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.sqs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageResult;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary queues created")
+public class ITDeleteSQS {
+
+    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
+    private final String TEST_QUEUE_URL = 
"https://sqs.us-west-2.amazonaws.com/123456789012/nifi-test-queue";;
+    private final String TEST_REGION = "us-west-2";
+    AmazonSQSClient sqsClient = null;
+
+    @Before
+    public void setUp() throws IOException {
+        PropertiesCredentials credentials = new PropertiesCredentials(new 
File(CREDENTIALS_FILE));
+        sqsClient = new AmazonSQSClient(credentials);
+        sqsClient.withRegion(Regions.fromName(TEST_REGION));
+    }
+
+    @Test
+    public void testSimpleDelete() throws IOException {
+        // Setup - put one message in queue
+        SendMessageResult sendMessageResult = 
sqsClient.sendMessage(TEST_QUEUE_URL, "Test message");
+        assertEquals(200, 
sendMessageResult.getSdkHttpMetadata().getHttpStatusCode());
+
+        // Setup - receive message to get receipt handle
+        ReceiveMessageResult receiveMessageResult = 
sqsClient.receiveMessage(TEST_QUEUE_URL);
+        assertEquals(200, 
receiveMessageResult.getSdkHttpMetadata().getHttpStatusCode());
+        Message deleteMessage = receiveMessageResult.getMessages().get(0);
+        String receiptHandle = deleteMessage.getReceiptHandle();
+
+        // Test - delete message with DeleteSQS
+        final TestRunner runner = TestRunners.newTestRunner(new DeleteSQS());
+        runner.setProperty(DeleteSQS.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteSQS.QUEUE_URL, TEST_QUEUE_URL);
+        runner.setProperty(DeleteSQS.REGION, TEST_REGION);
+        final Map<String, String> ffAttributes = new HashMap<>();
+        ffAttributes.put("filename", "1.txt");
+        ffAttributes.put("sqs.receipt.handle", receiptHandle);
+        runner.enqueue("TestMessageBody", ffAttributes);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/10254a03/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
index 11cc7cd..feb0075 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestDeleteSQS.java
@@ -16,12 +16,9 @@
  */
 package org.apache.nifi.processors.aws.sqs;
 
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
@@ -48,11 +45,6 @@ public class TestDeleteSQS {
     public void setUp() {
         mockSQSClient = Mockito.mock(AmazonSQSClient.class);
         mockDeleteSQS = new DeleteSQS() {
-
-            protected List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
-                return Arrays.asList(RECEIPT_HANDLE, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT, 
PROXY_HOST, PROXY_HOST_PORT);
-            }
-
             protected AmazonSQSClient getClient() {
                 actualSQSClient = client;
                 return mockSQSClient;
@@ -82,6 +74,26 @@ public class TestDeleteSQS {
     }
 
     @Test
+    public void testDeleteWithCustomReceiptHandle() {
+        runner.setProperty(DeleteSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
+        runner.setProperty(DeleteSQS.RECEIPT_HANDLE, 
"${custom.receipt.handle}");
+        final Map<String, String> ffAttributes = new HashMap<>();
+        ffAttributes.put("filename", "1.txt");
+        ffAttributes.put("custom.receipt.handle", "test-receipt-handle-1");
+        runner.enqueue("TestMessageBody", ffAttributes);
+
+        runner.assertValid();
+        runner.run(1);
+
+        ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = 
ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
+        DeleteMessageBatchRequest deleteRequest = 
captureDeleteRequest.getValue();
+        assertEquals("test-receipt-handle-1", 
deleteRequest.getEntries().get(0).getReceiptHandle());
+
+        runner.assertAllFlowFilesTransferred(DeleteSQS.REL_SUCCESS, 1);
+    }
+
+    @Test
     public void testDeleteException() {
         runner.setProperty(DeleteSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
         final Map<String, String> ff1Attributes = new HashMap<>();

Reply via email to