Repository: nifi
Updated Branches:
  refs/heads/master 8ed64c908 -> 19bc5ba99


NIFI-1495 Adding support for AWS Kinesis Firehose

This closes #213

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/19bc5ba9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/19bc5ba9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/19bc5ba9

Branch: refs/heads/master
Commit: 19bc5ba999219bc24a2b09246498cfdf23c96c7c
Parents: 8ed64c9
Author: mans2singh <[email protected]>
Authored: Wed Mar 16 09:46:38 2016 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Wed Mar 16 09:56:08 2016 -0400

----------------------------------------------------------------------
 .../AbstractKinesisFirehoseProcessor.java       |  81 ++++
 .../kinesis/firehose/PutKinesisFirehose.java    | 185 +++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../kinesis/firehose/ITPutKinesisFirehose.java  | 402 +++++++++++++++++++
 .../firehose/TestPutKinesisFirehose.java        |  81 ++++
 5 files changed, 750 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/19bc5ba9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
new file mode 100644
index 0000000..3b607c9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor<AmazonKinesisFirehoseClient> {
+
+    public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+            .name("Amazon Kinesis Firehose Delivery Stream Name")
+            .description("The name of kinesis firehose delivery stream")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("Batch size for messages (1-500).")
+            .defaultValue("250")
+            .required(false)
+            .addValidator(StandardValidators.createLongValidator(1, 500, true))
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new 
PropertyDescriptor.Builder()
+            .name("Max message buffer size")
+            .description("Max message buffer")
+            .defaultValue("1 MB")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way 
for creating clients
+     */
+    @Override
+    protected AmazonKinesisFirehoseClient createClient(final ProcessContext 
context, final AWSCredentialsProvider credentialsProvider, final 
ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AmazonKinesisFirehoseClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonKinesisFirehoseClient createClient(final ProcessContext 
context, final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
+        return new AmazonKinesisFirehoseClient(credentials, config);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/19bc5ba9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
new file mode 100644
index 0000000..f5c3b9f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
+    + "In order to send data to firehose, the firehose delivery stream name 
has to be specified.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+    @WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+    @WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description 
= "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+    /**
+     * Kinesis put record response error message
+     */
+    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+    /**
+     * Kinesis put record response error code
+     */
+    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+    /**
+     * Kinesis put record response record id
+     */
+    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, 
MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+                  PROXY_HOST,PROXY_HOST_PORT));
+
+    /**
+     * Max buffer size 1 MB
+     */
+    public static final int MAX_MESSAGE_SIZE = 1000 * 1024;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final long maxBufferSizeBytes = 
context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
+        final String firehoseStreamName = 
context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
+
+        List<FlowFile> flowFiles = new ArrayList<FlowFile>(batchSize);
+
+        long currentBufferSizeBytes = 0;
+
+        for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= 
maxBufferSizeBytes); i++) {
+
+            FlowFile flowFileCandidate = session.get();
+            if ( flowFileCandidate == null )
+                break;
+
+            if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) {
+                flowFileCandidate = handleFlowFileTooBig(session, 
flowFileCandidate, firehoseStreamName);
+                continue;
+            }
+
+            currentBufferSizeBytes += flowFileCandidate.getSize();
+
+            flowFiles.add(flowFileCandidate);
+        }
+
+        final AmazonKinesisFirehoseClient client = getClient();
+
+        try {
+            List<Record> records = new ArrayList<>();
+
+            List<FlowFile> failedFlowFiles = new ArrayList<>();
+            List<FlowFile> successfulFlowFiles = new ArrayList<>();
+
+            // Prepare batch of records
+            for (int i = 0; i < flowFiles.size(); i++) {
+                FlowFile flowFile = flowFiles.get(i);
+
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                session.exportTo(flowFile, baos);
+                records.add(new 
Record().withData(ByteBuffer.wrap(baos.toByteArray())));
+            }
+
+            if ( records.size() > 0 ) {
+                // Send the batch
+                PutRecordBatchRequest putRecordBatchRequest = new 
PutRecordBatchRequest();
+                
putRecordBatchRequest.setDeliveryStreamName(firehoseStreamName);
+                putRecordBatchRequest.setRecords(records);
+                PutRecordBatchResult results = 
client.putRecordBatch(putRecordBatchRequest);
+
+                // Separate out the successful and failed flow files
+                List<PutRecordBatchResponseEntry> responseEntries = 
results.getRequestResponses();
+                for (int i = 0; i < responseEntries.size(); i++ ) {
+                    PutRecordBatchResponseEntry entry = responseEntries.get(i);
+                    FlowFile flowFile = flowFiles.get(i);
+
+                    Map<String,String> attributes = new HashMap<>();
+                    attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, 
entry.getRecordId());
+                    flowFile = session.putAttribute(flowFile, 
AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
+                    if ( ! StringUtils.isBlank(entry.getErrorCode()) ) {
+                        attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, 
entry.getErrorCode());
+                        attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, 
entry.getErrorMessage());
+                        flowFile = session.putAllAttributes(flowFile, 
attributes);
+                        failedFlowFiles.add(flowFile);
+                    } else {
+                        flowFile = session.putAllAttributes(flowFile, 
attributes);
+                        successfulFlowFiles.add(flowFile);
+                    }
+                }
+                if ( failedFlowFiles.size() > 0 ) {
+                    session.transfer(failedFlowFiles, REL_FAILURE);
+                    getLogger().error("Failed to publish to kinesis firehose 
{} records {}", new Object[]{firehoseStreamName, failedFlowFiles});
+                }
+                if ( successfulFlowFiles.size() > 0 ) {
+                    session.transfer(successfulFlowFiles, REL_SUCCESS);
+                    getLogger().info("Successfully published to kinesis 
firehose {} records {}", new Object[]{firehoseStreamName, successfulFlowFiles});
+                }
+                records.clear();
+            }
+
+        } catch (final Exception exception) {
+            getLogger().error("Failed to publish to kinesis firehose {} with 
exception {}", new Object[]{flowFiles, exception});
+            session.transfer(flowFiles, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    protected FlowFile handleFlowFileTooBig(final ProcessSession session, 
FlowFile flowFileCandidate,
+            final String firehoseStreamName) {
+        flowFileCandidate = session.putAttribute(flowFileCandidate, 
AWS_KINESIS_FIREHOSE_ERROR_MESSAGE,
+            "record too big " + flowFileCandidate.getSize() + " max allowed " 
+ MAX_MESSAGE_SIZE );
+        session.transfer(flowFileCandidate, REL_FAILURE);
+        getLogger().error("Failed to publish to kinesis firehose {} records {} 
because the size was greater than {} bytes",
+            new Object[]{firehoseStreamName, flowFileCandidate, 
MAX_MESSAGE_SIZE});
+        return flowFileCandidate;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/19bc5ba9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 4608619..2d5460f 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -20,3 +20,4 @@ org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS
 org.apache.nifi.processors.aws.sqs.DeleteSQS
 org.apache.nifi.processors.aws.lambda.PutLambda
+org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose

http://git-wip-us.apache.org/repos/asf/nifi/blob/19bc5ba9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
new file mode 100644
index 0000000..32e42b1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests are 
ignored by default)
+ */
+public class ITPutKinesisFirehose {
+
+    private TestRunner runner;
+    protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
+        runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"deliveryName");
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+    public void testIntegrationSuccess() throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+
+        runner.enqueue("test".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
1);
+
+        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+
+        out.assertContentEquals("test".getBytes());
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+    public void testIntegrationFailedBadStreamName() throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"bad-firehose-s3-test");
+        runner.assertValid();
+
+        runner.enqueue("test".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_FAILURE, 
1);
+
+    }
+
+    @Test
+    public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+    }
+
+    @Test
+    public void 
testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "5");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "2 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+
+    @Test
+    public void 
testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+
+    @Test
+    public void 
testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+
+    @Test
+    public void 
testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.run(2, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
3);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(3,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+
+    @Test
+    public void 
testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed()
 {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE * 2)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue("hello".getBytes());
+        runner.enqueue(bytes);
+        runner.enqueue("there".getBytes());
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+
+        List<MockFlowFile> flowFilesFailed = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
+        assertEquals(1,flowFilesFailed.size());
+        for (MockFlowFile flowFileFailed : flowFilesFailed) {
+            
assertNotNull(flowFileFailed.getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
+        }
+    }
+
+    @Test
+    public void 
testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() 
throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE * 2)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue("hello".getBytes());
+        runner.enqueue(bytes);
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+            flowFile.assertContentEquals("hello".getBytes());
+        }
+
+        List<MockFlowFile> flowFilesFailed = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
+        assertEquals(1,flowFilesFailed.size());
+        for (MockFlowFile flowFileFailed : flowFilesFailed) {
+            
assertNotNull(flowFileFailed.getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
+        }
+    }
+
+    @Test
+    public void 
testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() 
throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE * 2)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue("HelloWorld".getBytes());
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(1,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+            flowFile.assertContentEquals("HelloWorld".getBytes());
+        }
+
+        List<MockFlowFile> flowFilesFailed = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
+        assertEquals(1,flowFilesFailed.size());
+        for (MockFlowFile flowFileFailed : flowFilesFailed) {
+            
assertNotNull(flowFileFailed.getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
+        }
+    }
+
+    @Test
+    public void 
testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() 
throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        runner.enqueue("Hello".getBytes());
+        runner.enqueue("World".getBytes());
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+        flowFiles.get(0).assertContentEquals("Hello".getBytes());
+        flowFiles.get(1).assertContentEquals("World".getBytes());
+
+        List<MockFlowFile> flowFilesFailed = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
+        assertEquals(0,flowFilesFailed.size());
+    }
+
+    @Test
+    public void 
testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "5");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+
+    @Test
+    public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "10");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[10];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
5);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(5,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+
+    @Test
+    public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "2");
+        runner.setProperty(PutKinesisFirehose.MAX_MESSAGE_BUFFER_SIZE_MB, "1 
MB");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"testkinesis");
+        runner.assertValid();
+        byte [] bytes = new byte[10];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes.clone());
+        runner.enqueue(bytes);
+        runner.enqueue(bytes.clone());
+        runner.run(1, true, true);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_SUCCESS, 
2);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_SUCCESS);
+        assertEquals(2,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            
flowFile.assertAttributeExists(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_RECORD_ID);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/19bc5ba9/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/TestPutKinesisFirehose.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/TestPutKinesisFirehose.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/TestPutKinesisFirehose.java
new file mode 100644
index 0000000..cb3f3e8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/TestPutKinesisFirehose.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPutKinesisFirehose {
+    private TestRunner runner;
+    protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+        runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
+        runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
+        
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"deliveryName");
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    @Test
+    public void testCustomValidateBatchSize1Valid() {
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testCustomValidateBatchSize500Valid() {
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
+        runner.assertValid();
+    }
+    @Test
+    public void testCustomValidateBatchSize501InValid() {
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testWithSizeGreaterThan1MB() {
+        runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
+        runner.assertValid();
+        byte [] bytes = new byte[(PutKinesisFirehose.MAX_MESSAGE_SIZE + 1)];
+        for (int i = 0; i < bytes.length; i++) {
+            bytes[i] = 'a';
+        }
+        runner.enqueue(bytes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutKinesisFirehose.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutKinesisFirehose.REL_FAILURE);
+
+        
assertNotNull(flowFiles.get(0).getAttribute(PutKinesisFirehose.AWS_KINESIS_FIREHOSE_ERROR_MESSAGE));
+    }
+}

Reply via email to