Repository: nifi
Updated Branches:
  refs/heads/master 0a7d14965 -> 9238fdb49


http://git-wip-us.apache.org/repos/asf/nifi/blob/9238fdb4/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
index 22d4d9b..1253e73 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
@@ -18,79 +18,134 @@ package org.apache.nifi.processors.aws.sqs;
 
 import java.util.List;
 
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
-import org.apache.nifi.processors.aws.sns.PutSNS;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Ignore;
-import org.junit.Test;
 
-@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary buckets created")
-public class TestGetSQS {
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.MessageAttributeValue;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 
-    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
-    @Test
-    public void testSimpleGet() {
-        final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
-        runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(GetSQS.TIMEOUT, "30 secs");
-        runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000";);
+import static org.junit.Assert.assertEquals;
 
-        runner.run(1);
 
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
-        for (final MockFlowFile mff : flowFiles) {
-            System.out.println(mff.getAttributes());
-            System.out.println(new String(mff.toByteArray()));
-        }
+public class TestGetSQS {
+
+    private TestRunner runner = null;
+    private GetSQS mockGetSQS = null;
+    private AmazonSQSClient actualSQSClient = null;
+    private AmazonSQSClient mockSQSClient = null;
+
+    @Before
+    public void setUp() {
+        mockSQSClient = Mockito.mock(AmazonSQSClient.class);
+        mockGetSQS = new GetSQS() {
+            protected AmazonSQSClient getClient() {
+                actualSQSClient = client;
+                return mockSQSClient;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockGetSQS);
     }
 
     @Test
-    public void testSimpleGetWithEL() {
-        System.setProperty("test-account-property", "100515378163");
-        System.setProperty("test-queue-property", "test-queue-000000000");
-        final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
-        runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(GetSQS.TIMEOUT, "30 secs");
-        runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/${test-account-property}/${test-queue-property}";);
+    public void testGetMessageNoAutoDelete() {
+        runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
+        runner.setProperty(GetSQS.AUTO_DELETE, "false");
+
+        Message message1 = new Message();
+        message1.setBody("TestMessage1");
+        message1.addAttributesEntry("attrib-key-1", "attrib-value-1");
+        MessageAttributeValue messageAttributeValue = new 
MessageAttributeValue();
+        messageAttributeValue.setStringValue("msg-attrib-value-1");
+        message1.addMessageAttributesEntry("msg-attrib-key-1", 
messageAttributeValue);
+        message1.setMD5OfBody("test-md5-hash-1");
+        message1.setMessageId("test-message-id-1");
+        message1.setReceiptHandle("test-receipt-handle-1");
+        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
+                .withMessages(message1);
+        
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
 
         runner.run(1);
 
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
-        for (final MockFlowFile mff : flowFiles) {
-            System.out.println(mff.getAttributes());
-            System.out.println(new String(mff.toByteArray()));
-        }
+        ArgumentCaptor<ReceiveMessageRequest> captureRequest = 
ArgumentCaptor.forClass(ReceiveMessageRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).receiveMessage(captureRequest.capture());
+        ReceiveMessageRequest request = captureRequest.getValue();
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
+        Mockito.verify(mockSQSClient, 
Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class));
+
+        runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1");
+        ff0.assertAttributeEquals("sqs.msg-attrib-key-1", 
"msg-attrib-value-1");
+        ff0.assertAttributeEquals("hash.value", "test-md5-hash-1");
+        ff0.assertAttributeEquals("hash.algorithm", "md5");
+        ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1");
+        ff0.assertAttributeEquals("sqs.receipt.handle", 
"test-receipt-handle-1");
     }
 
     @Test
-    public void testSimpleGetUsingCredentialsProviderService() throws 
Throwable {
-        final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
-
-        runner.setProperty(GetSQS.TIMEOUT, "30 secs");
-        String queueUrl = "Add queue url here";
-        runner.setProperty(GetSQS.QUEUE_URL, queueUrl);
+    public void testGetNoMessages() {
+        runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
+        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
+        
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
 
-        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.run(1);
 
-        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        ArgumentCaptor<ReceiveMessageRequest> captureRequest = 
ArgumentCaptor.forClass(ReceiveMessageRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).receiveMessage(captureRequest.capture());
+        ReceiveMessageRequest request = captureRequest.getValue();
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
 
-        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
-        runner.enableControllerService(serviceImpl);
+        runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0);
+    }
 
-        runner.assertValid(serviceImpl);
+    @Test
+    public void testGetMessageAndAutoDelete() {
+        runner.setProperty(GetSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
+        runner.setProperty(GetSQS.AUTO_DELETE, "true");
+
+        Message message1 = new Message();
+        message1.setBody("TestMessage1");
+        message1.setMessageId("test-message-id-1");
+        message1.setReceiptHandle("test-receipt-handle-1");
+        Message message2 = new Message();
+        message2.setBody("TestMessage2");
+        message2.setMessageId("test-message-id-2");
+        message2.setReceiptHandle("test-receipt-handle-2");
+        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult()
+                .withMessages(message1, message2);
+        
Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
 
-        runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
         runner.run(1);
 
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
-        for (final MockFlowFile mff : flowFiles) {
-            System.out.println(mff.getAttributes());
-            System.out.println(new String(mff.toByteArray()));
-        }
+        ArgumentCaptor<ReceiveMessageRequest> captureReceiveRequest = 
ArgumentCaptor.forClass(ReceiveMessageRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).receiveMessage(captureReceiveRequest.capture());
+        ReceiveMessageRequest receiveRequest = 
captureReceiveRequest.getValue();
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 receiveRequest.getQueueUrl());
+
+        ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = 
ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture());
+        DeleteMessageBatchRequest deleteRequest = 
captureDeleteRequest.getValue();
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 deleteRequest.getQueueUrl());
+        assertEquals("test-message-id-1", 
deleteRequest.getEntries().get(0).getId());
+        assertEquals("test-message-id-2", 
deleteRequest.getEntries().get(1).getId());
+
+        runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1");
+        MockFlowFile ff1 = flowFiles.get(1);
+        ff1.assertAttributeEquals("sqs.message.id", "test-message-id-2");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9238fdb4/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
index 7e21b8c..39bef8b 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java
@@ -17,70 +17,90 @@
 package org.apache.nifi.processors.aws.sqs;
 
 import java.io.IOException;
-import java.nio.file.Paths;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
-import org.apache.nifi.processors.aws.sns.PutSNS;
-import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.AmazonSQSException;
+import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
+import com.amazonaws.services.sqs.model.SendMessageBatchResult;
+
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+
 
-@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary buckets created")
 public class TestPutSQS {
 
-    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
+    private TestRunner runner = null;
+    private PutSQS mockPutSQS = null;
+    private AmazonSQSClient actualSQSClient = null;
+    private AmazonSQSClient mockSQSClient = null;
+
+    @Before
+    public void setUp() {
+        mockSQSClient = Mockito.mock(AmazonSQSClient.class);
+        mockPutSQS = new PutSQS() {
+            protected AmazonSQSClient getClient() {
+                actualSQSClient = client;
+                return mockSQSClient;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockPutSQS);
+    }
 
     @Test
     public void testSimplePut() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
-        runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutSQS.TIMEOUT, "30 secs");
-        runner.setProperty(PutSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000";);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(PutSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
         Assert.assertTrue(runner.setProperty("x-custom-prop", 
"hello").isValid());
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "1.txt");
-        runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
+        runner.enqueue("TestMessageBody", attrs);
+
+        SendMessageBatchResult batchResult = new SendMessageBatchResult();
+        
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult);
+
         runner.run(1);
 
+        ArgumentCaptor<SendMessageBatchRequest> captureRequest = 
ArgumentCaptor.forClass(SendMessageBatchRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
+        SendMessageBatchRequest request = captureRequest.getValue();
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
+        assertEquals("hello", 
request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue());
+        assertEquals("TestMessageBody", 
request.getEntries().get(0).getMessageBody());
+
         runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1);
     }
 
     @Test
-    public void testSimplePutUsingCredentialsProviderService() throws 
Throwable {
-        final TestRunner runner = TestRunners.newTestRunner(new PutSQS());
-
-        runner.setProperty(PutSQS.TIMEOUT, "30 secs");
-        String queueUrl = "Add queue url here";
-        runner.setProperty(PutSQS.QUEUE_URL, queueUrl);
+    public void testPutException() throws IOException {
         runner.setValidateExpressionUsage(false);
-        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
-
-        runner.addControllerService("awsCredentialsProvider", serviceImpl);
-
-        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
-        runner.enableControllerService(serviceImpl);
-
-        runner.assertValid(serviceImpl);
+        runner.setProperty(PutSQS.QUEUE_URL, 
"https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";);
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "1.txt");
-        runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
-                runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+        runner.enqueue("TestMessageBody", attrs);
+
+        
Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new
 AmazonSQSException("TestFail"));
+
         runner.run(1);
 
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS);
-        for (final MockFlowFile mff : flowFiles) {
-            System.out.println(mff.getAttributes());
-            System.out.println(new String(mff.toByteArray()));
-        }
+        ArgumentCaptor<SendMessageBatchRequest> captureRequest = 
ArgumentCaptor.forClass(SendMessageBatchRequest.class);
+        Mockito.verify(mockSQSClient, 
Mockito.times(1)).sendMessageBatch(captureRequest.capture());
+        SendMessageBatchRequest request = captureRequest.getValue();
+        
assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000";,
 request.getQueueUrl());
+        assertEquals("TestMessageBody", 
request.getEntries().get(0).getMessageBody());
 
+        runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1);
     }
+
 }

Reply via email to