Repository: nifi Updated Branches: refs/heads/master 0a7d14965 -> 9238fdb49
http://git-wip-us.apache.org/repos/asf/nifi/blob/9238fdb4/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java index 22d4d9b..1253e73 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java @@ -18,79 +18,134 @@ package org.apache.nifi.processors.aws.sqs; import java.util.List; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; -import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; -import org.apache.nifi.processors.aws.sns.PutSNS; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Ignore; -import org.junit.Test; -@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") -public class TestGetSQS { +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; - private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; - @Test - public void testSimpleGet() { - final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); - runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(GetSQS.TIMEOUT, "30 secs"); - runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); +import static org.junit.Assert.assertEquals; - runner.run(1); - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } +public class TestGetSQS { + + private TestRunner runner = null; + private GetSQS mockGetSQS = null; + private AmazonSQSClient actualSQSClient = null; + private AmazonSQSClient mockSQSClient = null; + + @Before + public void setUp() { + mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockGetSQS = new GetSQS() { + protected AmazonSQSClient getClient() { + actualSQSClient = client; + return mockSQSClient; + } + }; + runner = TestRunners.newTestRunner(mockGetSQS); } @Test - public void testSimpleGetWithEL() { - System.setProperty("test-account-property", "100515378163"); - System.setProperty("test-queue-property", "test-queue-000000000"); - final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); - runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(GetSQS.TIMEOUT, "30 secs"); - runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/${test-account-property}/${test-queue-property}"); + public void testGetMessageNoAutoDelete() { + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + runner.setProperty(GetSQS.AUTO_DELETE, "false"); + + Message message1 = new Message(); + message1.setBody("TestMessage1"); + message1.addAttributesEntry("attrib-key-1", "attrib-value-1"); + MessageAttributeValue messageAttributeValue = new MessageAttributeValue(); + messageAttributeValue.setStringValue("msg-attrib-value-1"); + message1.addMessageAttributesEntry("msg-attrib-key-1", messageAttributeValue); + message1.setMD5OfBody("test-md5-hash-1"); + message1.setMessageId("test-message-id-1"); + message1.setReceiptHandle("test-receipt-handle-1"); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult() + .withMessages(message1); + Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); runner.run(1); - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + ArgumentCaptor<ReceiveMessageRequest> captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture()); + ReceiveMessageRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + Mockito.verify(mockSQSClient, Mockito.never()).deleteMessageBatch(Mockito.any(DeleteMessageBatchRequest.class)); + + runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("sqs.attrib-key-1", "attrib-value-1"); + ff0.assertAttributeEquals("sqs.msg-attrib-key-1", "msg-attrib-value-1"); + ff0.assertAttributeEquals("hash.value", "test-md5-hash-1"); + ff0.assertAttributeEquals("hash.algorithm", "md5"); + ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1"); + ff0.assertAttributeEquals("sqs.receipt.handle", "test-receipt-handle-1"); } @Test - public void testSimpleGetUsingCredentialsProviderService() throws Throwable { - final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); - - runner.setProperty(GetSQS.TIMEOUT, "30 secs"); - String queueUrl = "Add queue url here"; - runner.setProperty(GetSQS.QUEUE_URL, queueUrl); + public void testGetNoMessages() { + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(); + Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); - final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.run(1); - runner.addControllerService("awsCredentialsProvider", serviceImpl); + ArgumentCaptor<ReceiveMessageRequest> captureRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureRequest.capture()); + ReceiveMessageRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); - runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); - runner.enableControllerService(serviceImpl); + runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 0); + } - runner.assertValid(serviceImpl); + @Test + public void testGetMessageAndAutoDelete() { + runner.setProperty(GetSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); + runner.setProperty(GetSQS.AUTO_DELETE, "true"); + + Message message1 = new Message(); + message1.setBody("TestMessage1"); + message1.setMessageId("test-message-id-1"); + message1.setReceiptHandle("test-receipt-handle-1"); + Message message2 = new Message(); + message2.setBody("TestMessage2"); + message2.setMessageId("test-message-id-2"); + message2.setReceiptHandle("test-receipt-handle-2"); + ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult() + .withMessages(message1, message2); + Mockito.when(mockSQSClient.receiveMessage(Mockito.any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult); - runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); runner.run(1); - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + ArgumentCaptor<ReceiveMessageRequest> captureReceiveRequest = ArgumentCaptor.forClass(ReceiveMessageRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).receiveMessage(captureReceiveRequest.capture()); + ReceiveMessageRequest receiveRequest = captureReceiveRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", receiveRequest.getQueueUrl()); + + ArgumentCaptor<DeleteMessageBatchRequest> captureDeleteRequest = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).deleteMessageBatch(captureDeleteRequest.capture()); + DeleteMessageBatchRequest deleteRequest = captureDeleteRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", deleteRequest.getQueueUrl()); + assertEquals("test-message-id-1", deleteRequest.getEntries().get(0).getId()); + assertEquals("test-message-id-2", deleteRequest.getEntries().get(1).getId()); + + runner.assertAllFlowFilesTransferred(GetSQS.REL_SUCCESS, 2); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + MockFlowFile ff0 = flowFiles.get(0); + ff0.assertAttributeEquals("sqs.message.id", "test-message-id-1"); + MockFlowFile ff1 = flowFiles.get(1); + ff1.assertAttributeEquals("sqs.message.id", "test-message-id-2"); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9238fdb4/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java index 7e21b8c..39bef8b 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestPutSQS.java @@ -17,70 +17,90 @@ package org.apache.nifi.processors.aws.sqs; import java.io.IOException; -import java.nio.file.Paths; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; -import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; -import org.apache.nifi.processors.aws.sns.PutSNS; -import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.AmazonSQSException; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchResult; + import org.junit.Assert; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; + -@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created") public class TestPutSQS { - private final String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + private TestRunner runner = null; + private PutSQS mockPutSQS = null; + private AmazonSQSClient actualSQSClient = null; + private AmazonSQSClient mockSQSClient = null; + + @Before + public void setUp() { + mockSQSClient = Mockito.mock(AmazonSQSClient.class); + mockPutSQS = new PutSQS() { + protected AmazonSQSClient getClient() { + actualSQSClient = client; + return mockSQSClient; + } + }; + runner = TestRunners.newTestRunner(mockPutSQS); + } @Test public void testSimplePut() throws IOException { - final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); - runner.setProperty(PutSNS.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutSQS.TIMEOUT, "30 secs"); - runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/100515378163/test-queue-000000000"); + runner.setValidateExpressionUsage(false); + runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "1.txt"); - runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); + runner.enqueue("TestMessageBody", attrs); + + SendMessageBatchResult batchResult = new SendMessageBatchResult(); + Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenReturn(batchResult); + runner.run(1); + ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); + SendMessageBatchRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("hello", request.getEntries().get(0).getMessageAttributes().get("x-custom-prop").getStringValue()); + assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + runner.assertAllFlowFilesTransferred(PutSQS.REL_SUCCESS, 1); } @Test - public void testSimplePutUsingCredentialsProviderService() throws Throwable { - final TestRunner runner = TestRunners.newTestRunner(new PutSQS()); - - runner.setProperty(PutSQS.TIMEOUT, "30 secs"); - String queueUrl = "Add queue url here"; - runner.setProperty(PutSQS.QUEUE_URL, queueUrl); + public void testPutException() throws IOException { runner.setValidateExpressionUsage(false); - final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); - - runner.addControllerService("awsCredentialsProvider", serviceImpl); - - runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); - runner.enableControllerService(serviceImpl); - - runner.assertValid(serviceImpl); + runner.setProperty(PutSQS.QUEUE_URL, "https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000"); final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "1.txt"); - runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); - runner.setProperty(PutSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.enqueue("TestMessageBody", attrs); + + Mockito.when(mockSQSClient.sendMessageBatch(Mockito.any(SendMessageBatchRequest.class))).thenThrow(new AmazonSQSException("TestFail")); + runner.run(1); - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSQS.REL_SUCCESS); - for (final MockFlowFile mff : flowFiles) { - System.out.println(mff.getAttributes()); - System.out.println(new String(mff.toByteArray())); - } + ArgumentCaptor<SendMessageBatchRequest> captureRequest = ArgumentCaptor.forClass(SendMessageBatchRequest.class); + Mockito.verify(mockSQSClient, Mockito.times(1)).sendMessageBatch(captureRequest.capture()); + SendMessageBatchRequest request = captureRequest.getValue(); + assertEquals("https://sqs.us-west-2.amazonaws.com/123456789012/test-queue-000000000", request.getQueueUrl()); + assertEquals("TestMessageBody", request.getEntries().get(0).getMessageBody()); + runner.assertAllFlowFilesTransferred(PutSQS.REL_FAILURE, 1); } + }
