Repository: nifi
Updated Branches:
  refs/heads/master 9874d35b6 -> 0f9b55afb


first commit for aws lambda

corrected attributes of flow file

added shutdown + provenence calls

minor formatting and unused imports correction

removed unused property

updated to populate exception attributes in flow file

updated write attributes


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/180a90d1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/180a90d1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/180a90d1

Branch: refs/heads/master
Commit: 180a90d12b9b36c25c35f28c7e7435de03e0ea7b
Parents: f44eb64
Author: mans2singh <[email protected]>
Authored: Wed Feb 10 20:45:53 2016 -0800
Committer: mans2singh <[email protected]>
Committed: Thu Feb 11 20:59:54 2016 -0800

----------------------------------------------------------------------
 ...AbstractAWSCredentialsProviderProcessor.java |   8 +
 .../aws/lambda/AbstractAWSLambdaProcessor.java  |  72 ++++++
 .../nifi/processors/aws/lambda/PutLambda.java   | 232 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/aws/lambda/ITPutLambdaTest.java  | 146 ++++++++++++
 5 files changed, 459 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/180a90d1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
index f99349d..2e306ac 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.processor.ProcessContext;
@@ -75,6 +76,13 @@ public abstract class 
AbstractAWSCredentialsProviderProcessor<ClientType extends
 
      }
 
+    @OnShutdown
+    public void onShutDown() {
+        if ( this.client != null ) {
+           this.client.shutdown();
+        }
+    }
+
     /**
      * Get credentials provider using the {@link AWSCredentialsProviderService}
      * @param context the process context

http://git-wip-us.apache.org/repos/asf/nifi/blob/180a90d1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
new file mode 100644
index 0000000..a6f96a4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/AbstractAWSLambdaProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.lambda;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.lambda.AWSLambdaClient;
+
+/**
+ * This class provides processor the base class for invoking aws lambda
+ */
+public abstract class AbstractAWSLambdaProcessor extends 
AbstractAWSCredentialsProviderProcessor<AWSLambdaClient> {
+
+    public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_NAME = new 
PropertyDescriptor.Builder()
+            .name("Amazon Lambda Name")
+            .description("The Lambda Function Name")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor AWS_LAMBDA_FUNCTION_QUALIFIER = new 
PropertyDescriptor.Builder()
+            .name("Amazon Lambda Qualifier (version)")
+            .description("The Lambda Function Version")
+            .defaultValue("$LATEST")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way 
for creating clients
+     */
+    @Override
+    protected AWSLambdaClient createClient(final ProcessContext context, final 
AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AWSLambdaClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AWSLambdaClient createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
+        return new AWSLambdaClient(credentials, config);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/180a90d1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
new file mode 100644
index 0000000..9850bd2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/lambda/PutLambda.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.lambda;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.lambda.AWSLambdaClient;
+import com.amazonaws.services.lambda.model.InvalidParameterValueException;
+import com.amazonaws.services.lambda.model.InvalidRequestContentException;
+import com.amazonaws.services.lambda.model.InvocationType;
+import com.amazonaws.services.lambda.model.InvokeRequest;
+import com.amazonaws.services.lambda.model.InvokeResult;
+import com.amazonaws.services.lambda.model.LogType;
+import com.amazonaws.services.lambda.model.RequestTooLargeException;
+import com.amazonaws.services.lambda.model.ResourceNotFoundException;
+import com.amazonaws.services.lambda.model.ServiceException;
+import com.amazonaws.services.lambda.model.TooManyRequestsException;
+import com.amazonaws.services.lambda.model.UnsupportedMediaTypeException;
+import com.amazonaws.util.Base64;
+
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "lambda", "put"})
+@CapabilityDescription("Sends the contents to a specified Amazon Lamba 
Function")
+@WritesAttributes({
+    @WritesAttribute(attribute = "aws.lambda.result.function.error", 
description = "Function error message in result on posting message to AWS 
Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.result.status.code", description 
= "Status code in the result for the message when posting to AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.result.payload", description = 
"Payload in the result from AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.result.log", description = "Log 
in the result of the message posted to Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.exception.message", description = 
"Exception message on invoking from AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.exception.cause", description = 
"Exception cause on invoking from AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.exception.error.code", 
description = "Exception error code on invoking from AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.exception.request.id", 
description = "Exception request id on invoking from AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.exception.status.code", 
description = "Exception status code on invoking from AWS Lambda"),
+    @WritesAttribute(attribute = "aws.lambda.exception.error.type", 
description = "Exception error type on invoking from AWS Lambda")
+    })
+public class PutLambda extends AbstractAWSLambdaProcessor {
+
+    /**
+     * Lambda result function error message
+     */
+    public static final String AWS_LAMBDA_RESULT_FUNCTION_ERROR = 
"aws.lambda.result.function.error";
+
+    /**
+     * Lambda response status code
+     */
+    public static final String AWS_LAMBDA_RESULT_STATUS_CODE = 
"aws.lambda.result.status.code";
+
+    /**
+     * Lambda response log tail (4kb)
+     */
+    public static final String AWS_LAMBDA_RESULT_LOG = "aws.lambda.result.log";
+
+    /**
+     * Lambda payload in response
+     */
+    public static final String AWS_LAMBDA_RESULT_PAYLOAD = 
"aws.lambda.result.payload";
+
+    /**
+     * Lambda exception field
+     */
+    public static final String AWS_LAMBDA_EXCEPTION_MESSAGE = 
"aws.lambda.exception.message";
+
+    /**
+     * Lambda exception field
+     */
+    public static final String AWS_LAMBDA_EXCEPTION_CAUSE = 
"aws.lambda.exception.cause";
+
+    /**
+     * Lambda exception field
+     */
+    public static final String AWS_LAMBDA_EXCEPTION_ERROR_CODE = 
"aws.lambda.exception.error.code";
+
+    /**
+     * Lambda exception field
+     */
+    public static final String AWS_LAMBDA_EXCEPTION_REQUEST_ID = 
"aws.lambda.exception.request.id";
+
+    /**
+     * Lambda exception field
+     */
+    public static final String AWS_LAMBDA_EXCEPTION_STATUS_CODE = 
"aws.lambda.exception.status.code";
+
+    /**
+     * Lambda exception field
+     */
+    public static final String AWS_LAMBDA_EXCEPTION_ERROR_TYPE = 
"aws.lambda.exception.error.type";
+
+    /**
+     * Max request body size
+     */
+    public static final long MAX_REQUEST_SIZE = 6 * 1000 * 1000;
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, 
AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT
+            ));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String functionName = 
context.getProperty(AWS_LAMBDA_FUNCTION_NAME).getValue();
+
+        final String qualifier = 
context.getProperty(AWS_LAMBDA_FUNCTION_QUALIFIER).getValue();
+
+        // Max size of message is 6 MB
+        if ( flowFile.getSize() > MAX_REQUEST_SIZE) {
+            getLogger().error("Max size for request body is 6mb but was {} for 
flow file {} for function {}",
+                new Object[]{flowFile.getSize(), flowFile, functionName});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final AWSLambdaClient client = getClient();
+
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            session.exportTo(flowFile, baos);
+
+            InvokeRequest invokeRequest = new InvokeRequest()
+                .withFunctionName(functionName)
+                
.withLogType(LogType.Tail).withInvocationType(InvocationType.RequestResponse)
+                .withPayload(ByteBuffer.wrap(baos.toByteArray()))
+                .withQualifier(qualifier);
+            long startTime = System.nanoTime();
+
+            InvokeResult result = client.invoke(invokeRequest);
+
+            flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_RESULT_STATUS_CODE, result.getStatusCode().toString());
+
+            if ( !StringUtils.isBlank(result.getLogResult() )) {
+                flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_RESULT_LOG, new 
String(Base64.decode(result.getLogResult()),Charset.defaultCharset()));
+            }
+
+            if ( result.getPayload() != null ) {
+                flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_RESULT_PAYLOAD, new 
String(result.getPayload().array(),Charset.defaultCharset()));
+            }
+
+            if ( ! StringUtils.isBlank(result.getFunctionError()) ){
+                flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_RESULT_FUNCTION_ERROR, result.getFunctionError());
+                session.transfer(flowFile, REL_FAILURE);
+            } else {
+                session.transfer(flowFile, REL_SUCCESS);
+                final long totalTimeMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+                session.getProvenanceReporter().send(flowFile, functionName, 
totalTimeMillis);
+            }
+        } catch (final InvalidRequestContentException
+            | InvalidParameterValueException
+            | RequestTooLargeException
+            | ResourceNotFoundException
+            | UnsupportedMediaTypeException unrecoverableException) {
+                getLogger().error("Failed to invoke lambda {} with 
unrecoverable exception {} for flow file {}",
+                    new Object[]{functionName, unrecoverableException, 
flowFile});
+                flowFile = populateExceptionAttributes(session, flowFile, 
unrecoverableException);
+                session.transfer(flowFile, REL_FAILURE);
+        } catch (final ServiceException | TooManyRequestsException exception) {
+            getLogger().error("Failed to invoke lambda {} with exception {} 
for flow file {}, therefore penalizing flowfile",
+                new Object[]{functionName, exception, flowFile});
+            flowFile = populateExceptionAttributes(session, flowFile, 
exception);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        } catch (final Exception exception) {
+            getLogger().error("Failed to invoke lambda {} with exception {} 
for flow file {}",
+                new Object[]{functionName, exception, flowFile});
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    /**
+     * Populate exception attributes in the flow file
+     * @param session process session
+     * @param flowFile the flow file
+     * @param exception exception thrown during invocation
+     * @return FlowFile the updated flow file
+     */
+    private FlowFile populateExceptionAttributes(final ProcessSession session, 
FlowFile flowFile,
+            final AmazonServiceException exception) {
+        flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
+        flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_ERROR_CODE, exception.getErrorCode());
+        flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_REQUEST_ID, exception.getRequestId());
+        flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_STATUS_CODE, Integer.toString(exception.getStatusCode()));
+        if ( exception.getCause() != null )
+            flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_CAUSE, exception.getCause().getMessage());
+        flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_ERROR_TYPE, exception.getErrorType().toString());
+        flowFile = session.putAttribute(flowFile, 
AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
+        return flowFile;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/180a90d1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d0d1e73..4608619 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,3 +19,4 @@ org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS
 org.apache.nifi.processors.aws.sqs.DeleteSQS
+org.apache.nifi.processors.aws.lambda.PutLambda

http://git-wip-us.apache.org/repos/asf/nifi/blob/180a90d1/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/ITPutLambdaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/ITPutLambdaTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/ITPutLambdaTest.java
new file mode 100644
index 0000000..2291e90
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/lambda/ITPutLambdaTest.java
@@ -0,0 +1,146 @@
+package org.apache.nifi.processors.aws.lambda;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests are 
ignored by default)
+ */
+public class ITPutLambdaTest {
+
+    private TestRunner runner;
+    protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(PutLambda.class);
+        runner.setProperty(PutLambda.ACCESS_KEY, "abcd");
+        runner.setProperty(PutLambda.SECRET_KEY, "secret key");
+        runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "functionName");
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    @Test
+    public void testSizeGreaterThan6MB() throws Exception {
+        runner = TestRunners.newTestRunner(PutLambda.class);
+        runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
+        runner.assertValid();
+        byte [] largeInput = new byte[6000001];
+        for (int i = 0; i < 6000001; i++) {
+            largeInput[i] = 'a';
+        }
+        runner.enqueue(largeInput);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+    @Ignore
+    public void testIntegrationSuccess() throws Exception {
+        runner = TestRunners.newTestRunner(PutLambda.class);
+        runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
+        runner.assertValid();
+
+        runner.enqueue("{\"test\":\"hi\"}".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutLambda.REL_SUCCESS, 1);
+
+        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+        assertNull("Function error should be null " + 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
+        assertNotNull("log should not be null", 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
+        assertEquals("Status should be equal", 
"200",out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+//    @Ignore
+    public void testIntegrationClientErrorBadMessageBody() throws Exception {
+        runner = TestRunners.newTestRunner(PutLambda.class);
+        runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, "hello");
+        runner.assertValid();
+
+        runner.enqueue("badbod".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
+        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        final MockFlowFile out = ffs.iterator().next();
+        assertNull("Function error should be null since there is exception"
+            + out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
+        assertNull("log should not be null", 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
+        assertEquals("Status should be equal", 
null,out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
+        assertEquals("exception error code should be equal", 
"InvalidRequestContentException",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_ERROR_CODE));
+        assertEquals("exception exception error type should be equal", 
"Client",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_ERROR_TYPE));
+        assertEquals("exception exception error code should be equal", 
"400",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_STATUS_CODE));
+        assertTrue("exception exception error message should be start 
with",out.getAttribute(PutLambda.AWS_LAMBDA_EXCEPTION_MESSAGE)
+               .startsWith("Could not parse request body into json: 
Unrecognized token 'badbod': was expecting ('true', 'false' or 'null')"));
+    }
+
+    /**
+     * Comment out ignore for integration tests (requires creds files)
+     */
+    @Test
+    @Ignore
+    public void testIntegrationFailedBadStreamName() throws Exception {
+        runner = TestRunners.newTestRunner(PutLambda.class);
+        runner.setProperty(PutLambda.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutLambda.AWS_LAMBDA_FUNCTION_NAME, 
"bad-function-name");
+        runner.assertValid();
+
+        runner.enqueue("{\"test\":\"hi\"}".getBytes());
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PutLambda.REL_FAILURE, 1);
+        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        final MockFlowFile out = ffs.iterator().next();
+        assertNull("Function error should be null since there is exception"
+            + out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR), 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_FUNCTION_ERROR));
+        assertNull("log should not be null", 
out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_LOG));
+        assertEquals("Status should be equal", 
null,out.getAttribute(PutLambda.AWS_LAMBDA_RESULT_STATUS_CODE));
+
+    }
+}

Reply via email to