NIFI-1516 Provide AWS DynamoDB Delete/Put/Get processors

This closes #224.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f115a5c
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f115a5c
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f115a5c

Branch: refs/heads/0.x
Commit: 0f115a5ce008043559f9004d0f15bff58b86a421
Parents: a8afc15
Author: mans2singh <[email protected]>
Authored: Fri Apr 8 11:21:39 2016 -0400
Committer: Aldrin Piri <[email protected]>
Committed: Fri Apr 8 11:23:32 2016 -0400

----------------------------------------------------------------------
 .../processors/aws/AbstractAWSProcessor.java    |   7 +
 .../aws/dynamodb/AbstractDynamoDBProcessor.java | 338 ++++++++++++
 .../AbstractWriteDynamoDBProcessor.java         |  69 +++
 .../processors/aws/dynamodb/DeleteDynamoDB.java | 161 ++++++
 .../processors/aws/dynamodb/GetDynamoDB.java    | 197 +++++++
 .../nifi/processors/aws/dynamodb/ItemKeys.java  |  53 ++
 .../processors/aws/dynamodb/PutDynamoDB.java    | 197 +++++++
 .../org.apache.nifi.processor.Processor         |   5 +-
 .../aws/dynamodb/DeleteDynamoDBTest.java        | 368 +++++++++++++
 .../aws/dynamodb/GetDynamoDBTest.java           | 530 +++++++++++++++++++
 .../aws/dynamodb/ITAbstractDynamoDBTest.java    | 139 +++++
 .../dynamodb/ITPutGetDeleteGetDynamoDBTest.java | 428 +++++++++++++++
 .../processors/aws/dynamodb/ItemKeysTest.java   |  68 +++
 .../aws/dynamodb/PutDynamoDBTest.java           | 460 ++++++++++++++++
 14 files changed, 3019 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index a2eb9df..e2c2196 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -287,4 +288,10 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
 
     }
 
+    @OnShutdown
+    public void onShutdown() {
+        if ( getClient() != null ) {
+            getClient().shutdown();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
new file mode 100644
index 0000000..df1a9ff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.math.BigDecimal;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+
+/**
+ * Base class for Nifi dynamo db related processors
+ *
+ * @see DeleteDynamoDB
+ * @see PutDynamoDB
+ * @see GetDynamoDB
+ */
+public abstract class AbstractDynamoDBProcessor extends 
AbstractAWSCredentialsProviderProcessor<AmazonDynamoDBClient> {
+
+    public static final Relationship REL_UNPROCESSED = new 
Relationship.Builder().name("unprocessed")
+            .description("FlowFiles are routed to unprocessed relationship 
when DynamoDB is not able to process "
+               + "all the items in the request. Typical reasons are 
insufficient table throughput capacity and exceeding the maximum bytes per 
request. "
+               + "Unprocessed FlowFiles can be retried with a new 
request.").build();
+
+    public static final AllowableValue ALLOWABLE_VALUE_STRING = new 
AllowableValue("string");
+    public static final AllowableValue ALLOWABLE_VALUE_NUMBER = new 
AllowableValue("number");
+
+    public static final String DYNAMODB_KEY_ERROR_UNPROCESSED = 
"dynamodb.key.error.unprocessed";
+    public static final String DYNAMODB_RANGE_KEY_VALUE_ERROR = 
"dynmodb.range.key.value.error";
+    public static final String DYNAMODB_HASH_KEY_VALUE_ERROR = 
"dynmodb.hash.key.value.error";
+    public static final String DYNAMODB_KEY_ERROR_NOT_FOUND = 
"dynamodb.key.error.not.found";
+    public static final String DYNAMODB_ERROR_EXCEPTION_MESSAGE = 
"dynamodb.error.exception.message";
+    public static final String DYNAMODB_ERROR_CODE = "dynamodb.error.code";
+    public static final String DYNAMODB_ERROR_MESSAGE = 
"dynamodb.error.message";
+    public static final String DYNAMODB_ERROR_TYPE = "dynamodb.error.type";
+    public static final String DYNAMODB_ERROR_SERVICE = 
"dynamodb.error.service";
+    public static final String DYNAMODB_ERROR_RETRYABLE = 
"dynamodb.error.retryable";
+    public static final String DYNAMODB_ERROR_REQUEST_ID = 
"dynamodb.error.request.id";
+    public static final String DYNAMODB_ERROR_STATUS_CODE = 
"dynamodb.error.status.code";
+    public static final String DYNAMODB_ITEM_HASH_KEY_VALUE = "  
dynamodb.item.hash.key.value";
+    public static final String DYNAMODB_ITEM_RANGE_KEY_VALUE = "  
dynamodb.item.range.key.value";
+    public static final String DYNAMODB_ITEM_IO_ERROR = 
"dynamodb.item.io.error";
+    public static final String AWS_DYNAMO_DB_ITEM_SIZE_ERROR = 
"dynamodb.item.size.error";
+
+    protected static final String DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE = 
"DynamoDB key not found : ";
+
+    public static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The DynamoDB table name")
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_VALUE = new 
PropertyDescriptor.Builder()
+            .name("Hash Key Value")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The hash key value of the item")
+            .defaultValue("${dynamodb.item.hash.key.value}")
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_VALUE = new 
PropertyDescriptor.Builder()
+            .name("Range Key Value")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("${dynamodb.item.range.key.value}")
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_VALUE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Hash Key Value Type")
+            .required(true)
+            .description("The hash key value type of the item")
+            .defaultValue(ALLOWABLE_VALUE_STRING.getValue())
+            .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_VALUE_TYPE = new 
PropertyDescriptor.Builder()
+            .name("Range Key Value Type")
+            .required(true)
+            .description("The range key value type of the item")
+            .defaultValue(ALLOWABLE_VALUE_STRING.getValue())
+            .allowableValues(ALLOWABLE_VALUE_STRING, ALLOWABLE_VALUE_NUMBER)
+            .build();
+
+    public static final PropertyDescriptor HASH_KEY_NAME = new 
PropertyDescriptor.Builder()
+            .name("Hash Key Name")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The hash key name of the item")
+            .build();
+
+    public static final PropertyDescriptor RANGE_KEY_NAME = new 
PropertyDescriptor.Builder()
+            .name("Range Key Name")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The range key name of the item")
+            .build();
+
+    public static final PropertyDescriptor JSON_DOCUMENT = new 
PropertyDescriptor.Builder()
+            .name("Json Document attribute")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The Json document to be retrieved from the dynamodb 
item")
+            .build();
+
+    public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Batch items for each request (between 1 and 50)")
+            .required(false)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.createLongValidator(1, 50, true))
+            .defaultValue("1")
+            .description("The items to be retrieved in one batch")
+            .build();
+
+    public static final PropertyDescriptor DOCUMENT_CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character set of document")
+            .description("Character set of data in the document")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .required(true)
+            .defaultValue(Charset.defaultCharset().name())
+            .build();
+
+    protected volatile DynamoDB dynamoDB;
+
+    public static final Set<Relationship> dynamoDBrelationships = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, 
REL_UNPROCESSED)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return dynamoDBrelationships;
+    }
+
+    /**
+     * Create client using credentials provider. This is the preferred way for 
creating clients
+     */
+    @Override
+    protected AmazonDynamoDBClient createClient(final ProcessContext context, 
final AWSCredentialsProvider credentialsProvider, final ClientConfiguration 
config) {
+        getLogger().debug("Creating client with credentials provider");
+
+        final AmazonDynamoDBClient client = new 
AmazonDynamoDBClient(credentialsProvider, config);
+
+        return client;
+    }
+
+    /**
+     * Create client using AWSCredentials
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonDynamoDBClient createClient(final ProcessContext context, 
final AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().debug("Creating client with aws credentials");
+
+        final AmazonDynamoDBClient client = new 
AmazonDynamoDBClient(credentials, config);
+
+        return client;
+    }
+
+    protected Object getValue(ProcessContext context, PropertyDescriptor type, 
PropertyDescriptor value, FlowFile flowFile) {
+        if ( 
context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) 
{
+            return 
context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            return new 
BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue());
+        }
+    }
+
+    protected Object getAttributeValue(ProcessContext context, 
PropertyDescriptor propertyType, AttributeValue value) {
+        if ( 
context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue()))
 {
+            if ( value == null ) return null;
+            else return value.getS();
+        } else {
+            if ( value == null ) return null;
+            else return new BigDecimal(value.getN());
+        }
+    }
+
+    protected synchronized DynamoDB getDynamoDB() {
+        if ( dynamoDB == null )
+            dynamoDB = new DynamoDB(client);
+        return dynamoDB;
+    }
+
+    protected Object getValue(Map<String, AttributeValue> item, String 
keyName, String valueType) {
+        if ( ALLOWABLE_VALUE_STRING.getValue().equals(valueType)) {
+            AttributeValue val = item.get(keyName);
+            if ( val == null ) return val;
+            else return val.getS();
+        } else {
+            AttributeValue val = item.get(keyName);
+            if ( val == null ) return val;
+            else return val.getN();
+        }
+    }
+
+    protected List<FlowFile> processException(final ProcessSession session, 
List<FlowFile> flowFiles, Exception exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            flowFile = session.putAttribute(flowFile, 
DYNAMODB_ERROR_EXCEPTION_MESSAGE, exception.getMessage() );
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    protected List<FlowFile> processClientException(final ProcessSession 
session, List<FlowFile> flowFiles,
+            AmazonClientException exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = new HashMap<>();
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, 
exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, 
Boolean.toString(exception.isRetryable()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    protected List<FlowFile> processServiceException(final ProcessSession 
session, List<FlowFile> flowFiles,
+            AmazonServiceException exception) {
+        List<FlowFile> failedFlowFiles = new ArrayList<>();
+        for (FlowFile flowFile : flowFiles) {
+            Map<String,String> attributes = new HashMap<>();
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, 
exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_CODE, exception.getErrorCode() );
+            attributes.put(DYNAMODB_ERROR_MESSAGE, exception.getErrorMessage() 
);
+            attributes.put(DYNAMODB_ERROR_TYPE, 
exception.getErrorType().name() );
+            attributes.put(DYNAMODB_ERROR_SERVICE, exception.getServiceName() 
);
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, 
Boolean.toString(exception.isRetryable()));
+            attributes.put(DYNAMODB_ERROR_REQUEST_ID, exception.getRequestId() 
);
+            attributes.put(DYNAMODB_ERROR_STATUS_CODE, 
Integer.toString(exception.getStatusCode()) );
+            attributes.put(DYNAMODB_ERROR_EXCEPTION_MESSAGE, 
exception.getMessage() );
+            attributes.put(DYNAMODB_ERROR_RETRYABLE, 
Boolean.toString(exception.isRetryable()));
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            failedFlowFiles.add(flowFile);
+        }
+        return failedFlowFiles;
+    }
+
+    /**
+     * Send unhandled items to failure and remove the flow files from key to 
flow file map
+     * @param session used for sending the flow file
+     * @param keysToFlowFileMap - ItemKeys to flow file map
+     * @param hashKeyValue the items hash key value
+     * @param rangeKeyValue the items hash key value
+     */
+    protected void sendUnprocessedToUnprocessedRelationship(final 
ProcessSession session, Map<ItemKeys, FlowFile> keysToFlowFileMap, Object 
hashKeyValue, Object rangeKeyValue) {
+        ItemKeys itemKeys = new ItemKeys(hashKeyValue, rangeKeyValue);
+
+        FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
+        flowFile = session.putAttribute(flowFile, 
DYNAMODB_KEY_ERROR_UNPROCESSED, itemKeys.toString());
+        session.transfer(flowFile,REL_UNPROCESSED);
+
+        getLogger().error("Unprocessed key " + itemKeys + " for flow file " + 
flowFile);
+
+        keysToFlowFileMap.remove(itemKeys);
+    }
+
+    protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object 
rangeKeyValue, ProcessSession session,
+            FlowFile flowFile) {
+        boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
+        boolean isRangeValueNull = rangeKeyValue == null;
+        boolean isConsistent = true;
+        if ( ! isRangeNameBlank && (isRangeValueNull || 
StringUtils.isBlank(rangeKeyValue.toString()))) {
+            isConsistent = false;
+        }
+        if ( isRangeNameBlank &&  ( ! isRangeValueNull && ! 
StringUtils.isBlank(rangeKeyValue.toString()))) {
+            isConsistent = false;
+        }
+
+        if ( ! isConsistent ) {
+            getLogger().error("Range key name '" + rangeKeyName + "' was not 
consistent with range value "
+                + rangeKeyValue + "'" + flowFile);
+            flowFile = session.putAttribute(flowFile, 
DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
+                 + "'/value '" + rangeKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+        return isConsistent;
+
+    }
+
+    protected boolean isHashKeyValueConsistent(String hashKeyName, Object 
hashKeyValue, ProcessSession session,
+            FlowFile flowFile) {
+
+        boolean isConsistent = true;
+
+        if ( hashKeyValue == null || 
StringUtils.isBlank(hashKeyValue.toString())) {
+            getLogger().error("Hash key value '" + hashKeyValue + "' is 
required for flow file " + flowFile);
+                 flowFile = session.putAttribute(flowFile, 
DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName
+                     + "/value '" + hashKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+            isConsistent = false;
+        }
+
+        return isConsistent;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
new file mode 100644
index 0000000..e7ac523
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractWriteDynamoDBProcessor.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+public abstract class AbstractWriteDynamoDBProcessor extends 
AbstractDynamoDBProcessor {
+
+    /**
+     * Helper method to handle unprocessed items items
+     * @param session process session
+     * @param keysToFlowFileMap map of flow db primary key to flow file
+     * @param table dynamodb table
+     * @param hashKeyName the hash key name
+     * @param hashKeyValueType the hash key value
+     * @param rangeKeyName the range key name
+     * @param rangeKeyValueType range key value
+     * @param outcome the write outcome
+     */
+    protected void handleUnprocessedItems(final ProcessSession session, 
Map<ItemKeys, FlowFile> keysToFlowFileMap, final String table, final String 
hashKeyName, final String hashKeyValueType,
+            final String rangeKeyName, final String rangeKeyValueType, 
BatchWriteItemOutcome outcome) {
+        BatchWriteItemResult result = outcome.getBatchWriteItemResult();
+
+        // Handle unprocessed items
+        List<WriteRequest> unprocessedItems = 
result.getUnprocessedItems().get(table);
+        if ( unprocessedItems != null && unprocessedItems.size() > 0 ) {
+            for ( WriteRequest request : unprocessedItems) {
+                Map<String,AttributeValue> item = getRequestItem(request);
+                Object hashKeyValue = getValue(item, hashKeyName, 
hashKeyValueType);
+                Object rangeKeyValue = getValue(item, rangeKeyName, 
rangeKeyValueType);
+
+                sendUnprocessedToUnprocessedRelationship(session, 
keysToFlowFileMap, hashKeyValue, rangeKeyValue);
+            }
+        }
+    }
+
+    /**
+     * Get the request item key and attribute value
+     * @param writeRequest write request
+     * @return Map of keys and values
+     *
+     * @see PutDynamoDB#getRequestItem(WriteRequest)
+     * @see DeleteDynamoDB#getRequestItem(WriteRequest)
+     */
+    protected abstract Map<String, AttributeValue> getRequestItem(WriteRequest 
writeRequest);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
new file mode 100644
index 0000000..a82de34
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+@SupportsBatching
+@SeeAlso({GetDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Delete", "Remove"})
+@CapabilityDescription("Deletes a document from DynamoDB based on hash and 
range key. The key can be string or number."
+        + " The request requires all the primary keys for the operation (hash 
or hash and range key)")
+@WritesAttributes({
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
status code")
+    })
+@ReadsAttributes({
+    @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items 
hash key value" ),
+    @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items 
range key value" ),
+    })
+public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor {
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, 
HASH_KEY_VALUE, RANGE_KEY_VALUE,
+                HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, BATCH_SIZE, REGION, 
ACCESS_KEY, SECRET_KEY,
+                CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, 
SSL_CONTEXT_SERVICE));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String table = context.getProperty(TABLE).getValue();
+
+        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).getValue();
+        final String hashKeyValueType = 
context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
+        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).getValue();
+        final String rangeKeyValueType = 
context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
+
+        TableWriteItems tableWriteItems = new TableWriteItems(table);
+
+        for (FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile);
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+
+            if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, 
session, flowFile)) {
+                continue;
+            }
+
+            if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, 
session, flowFile) ) {
+                continue;
+            }
+
+            if ( rangeKeyValue == null || 
StringUtils.isBlank(rangeKeyValue.toString()) ) {
+                tableWriteItems.addHashOnlyPrimaryKeysToDelete(hashKeyName, 
hashKeyValue);
+            } else {
+                tableWriteItems.addHashAndRangePrimaryKeyToDelete(hashKeyName,
+                        hashKeyValue, rangeKeyName, rangeKeyValue);
+            }
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), 
flowFile);
+        }
+
+        if ( keysToFlowFileMap.isEmpty() ) {
+            return;
+        }
+
+        final DynamoDB dynamoDB = getDynamoDB();
+
+        try {
+            BatchWriteItemOutcome outcome = 
dynamoDB.batchWriteItem(tableWriteItems);
+
+            handleUnprocessedItems(session, keysToFlowFileMap, table, 
hashKeyName, hashKeyValueType, rangeKeyName,
+               rangeKeyValueType, outcome);
+
+            // All non unprocessed items are successful
+            for (FlowFile flowFile : keysToFlowFileMap.values()) {
+                getLogger().debug("Successfully deleted item from dynamodb : " 
+ table);
+                session.transfer(flowFile,REL_SUCCESS);
+            }
+        } catch(AmazonServiceException exception) {
+            getLogger().error("Could not process flowFiles due to service 
exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processServiceException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(AmazonClientException exception) {
+            getLogger().error("Could not process flowFiles due to client 
exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processClientException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(Exception exception) {
+            getLogger().error("Could not process flowFiles due to exception : 
" + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected Map<String, AttributeValue> getRequestItem(WriteRequest 
writeRequest) {
+        return writeRequest.getDeleteRequest().getKey();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
new file mode 100644
index 0000000..808600c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+
+@SupportsBatching
+@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Get", "Fetch"})
+@CapabilityDescription("Retrieves a document from DynamoDB based on hash and 
range key.  The key can be string or number."
+        + "For any get request all the primary keys are required (hash or hash 
and range based on the table keys)."
+        + "A Json Document ('Map') attribute of the DynamoDB item is read into 
the content of the FlowFile.")
+@WritesAttributes({
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
status code")
+    })
+@ReadsAttributes({
+    @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items 
hash key value" ),
+    @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items 
range key value" ),
+    })
+public class GetDynamoDB extends AbstractDynamoDBProcessor {
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, 
HASH_KEY_VALUE, RANGE_KEY_VALUE,
+                HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, 
BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY,
+                CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, 
SSL_CONTEXT_SERVICE));
+
+    public static final Relationship REL_NOT_FOUND = new 
Relationship.Builder().name("not found")
+            .description("FlowFiles are routed to not found relationship if 
key not found in the table").build();
+
+    public static final Set<Relationship> getDynamoDBrelationships = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, 
REL_UNPROCESSED, REL_NOT_FOUND)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return getDynamoDBrelationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String table = context.getProperty(TABLE).getValue();
+        TableKeysAndAttributes tableKeysAndAttributes = new 
TableKeysAndAttributes(table);
+
+        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).getValue();
+        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).getValue();
+        final String jsonDocument = 
context.getProperty(JSON_DOCUMENT).getValue();
+
+        for (FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile);
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+
+            if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, 
session, flowFile)) {
+                continue;
+            }
+
+            if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, 
session, flowFile) ) {
+                continue;
+            }
+
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), 
flowFile);
+
+            if ( rangeKeyValue == null || 
StringUtils.isBlank(rangeKeyValue.toString()) ) {
+                tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, 
hashKeyValue);
+            } else {
+                tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, 
hashKeyValue, rangeKeyName, rangeKeyValue);
+            }
+        }
+
+        if (keysToFlowFileMap.isEmpty()) {
+            return;
+        }
+
+        final DynamoDB dynamoDB = getDynamoDB();
+
+        try {
+            BatchGetItemOutcome result = 
dynamoDB.batchGetItem(tableKeysAndAttributes);
+
+            // Handle processed items and get the json document
+            List<Item> items = result.getTableItems().get(table);
+            for (Item item : items) {
+                ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), 
item.get(rangeKeyName));
+                FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
+
+                if ( item.get(jsonDocument) != null ) {
+                    ByteArrayInputStream bais = new 
ByteArrayInputStream(item.getJSON(jsonDocument).getBytes());
+                    flowFile = session.importFrom(bais, flowFile);
+                }
+
+                session.transfer(flowFile,REL_SUCCESS);
+                keysToFlowFileMap.remove(itemKeys);
+            }
+
+            // Handle unprocessed keys
+            Map<String, KeysAndAttributes> unprocessedKeys = 
result.getUnprocessedKeys();
+            if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
+                KeysAndAttributes keysAndAttributes = 
unprocessedKeys.get(table);
+                List<Map<String, AttributeValue>> keys = 
keysAndAttributes.getKeys();
+
+                for (Map<String,AttributeValue> unprocessedKey : keys) {
+                    Object hashKeyValue = getAttributeValue(context, 
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
+                    Object rangeKeyValue = getAttributeValue(context, 
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
+                    sendUnprocessedToUnprocessedRelationship(session, 
keysToFlowFileMap, hashKeyValue, rangeKeyValue);
+                }
+            }
+
+            // Handle any remaining items
+            for (ItemKeys key : keysToFlowFileMap.keySet()) {
+                FlowFile flowFile = keysToFlowFileMap.get(key);
+                flowFile = session.putAttribute(flowFile, 
DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + 
key.toString() );
+                session.transfer(flowFile,REL_NOT_FOUND);
+                keysToFlowFileMap.remove(key);
+            }
+
+        } catch(AmazonServiceException exception) {
+            getLogger().error("Could not process flowFiles due to service 
exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processServiceException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(AmazonClientException exception) {
+            getLogger().error("Could not process flowFiles due to client 
exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processClientException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch(Exception exception) {
+            getLogger().error("Could not process flowFiles due to exception : 
" + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
new file mode 100644
index 0000000..9e53457
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/ItemKeys.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ * Utility class to keep a map of keys and flow files
+ */
+class ItemKeys {
+
+    protected Object hashKey = "";
+    protected Object rangeKey = "";
+
+    public ItemKeys(Object hashKey, Object rangeKey) {
+        if ( hashKey != null )
+            this.hashKey = hashKey;
+        if ( rangeKey != null )
+            this.rangeKey = rangeKey;
+    }
+
+    @Override
+    public String toString() {
+        return 
ToStringBuilder.reflectionToString(this,ToStringStyle.SHORT_PREFIX_STYLE);
+    }
+
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this, false);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return EqualsBuilder.reflectionEquals(this, other, false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
new file mode 100644
index 0000000..83fea37
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+@SupportsBatching
+@SeeAlso({DeleteDynamoDB.class, GetDynamoDB.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "DynamoDB", "AWS", "Put", "Insert"})
+@CapabilityDescription("Puts a document from DynamoDB based on hash and range 
key.  The table can have either hash and range or hash key alone."
+    + " Currently the keys supported are string and number and value can be 
json document. "
+    + "In case of hash and range keys both key are required for the operation."
+    + " The FlowFile content must be JSON. FlowFile content is mapped to the 
specified Json Document attribute in the DynamoDB item.")
+@WritesAttributes({
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED, description = "Dynamo 
db unprocessed keys"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR, description = 
"Dynamod db range key error"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND, description = "Dynamo 
db key not found"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE, description = 
"Dynamo db exception message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE, description = "Dynamo db error 
code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE, description = "Dynamo db 
error message"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE, description = "Dynamo db error 
type"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE, description = "Dynamo db 
error service"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE, description = "Dynamo db 
error is retryable"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID, description = "Dynamo db 
error request id"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE, description = "Dynamo db 
error status code"),
+    @WritesAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_IO_ERROR, description = "IO exception 
message on creating item")
+})
+@ReadsAttributes({
+    @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_HASH_KEY_VALUE, description = "Items 
hash key value"),
+    @ReadsAttribute(attribute = 
AbstractDynamoDBProcessor.DYNAMODB_ITEM_RANGE_KEY_VALUE, description = "Items 
range key value")
+})
+public class PutDynamoDB extends AbstractWriteDynamoDBProcessor {
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+        Arrays.asList(TABLE, HASH_KEY_NAME, RANGE_KEY_NAME, HASH_KEY_VALUE, 
RANGE_KEY_VALUE,
+            HASH_KEY_VALUE_TYPE, RANGE_KEY_VALUE_TYPE, JSON_DOCUMENT, 
DOCUMENT_CHARSET, BATCH_SIZE,
+            REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE));
+
+    /**
+     * Dyamodb max item size limit 400 kb
+     */
+    public static final int DYNAMODB_MAX_ITEM_SIZE = 400 * 1024;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        Map<ItemKeys, FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String table = context.getProperty(TABLE).getValue();
+
+        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).getValue();
+        final String hashKeyValueType = 
context.getProperty(HASH_KEY_VALUE_TYPE).getValue();
+        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).getValue();
+        final String rangeKeyValueType = 
context.getProperty(RANGE_KEY_VALUE_TYPE).getValue();
+        final String jsonDocument = 
context.getProperty(JSON_DOCUMENT).getValue();
+        final String charset = 
context.getProperty(DOCUMENT_CHARSET).getValue();
+
+        TableWriteItems tableWriteItems = new TableWriteItems(table);
+
+        for (FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile);
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+
+            if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, 
flowFile)) {
+                continue;
+            }
+
+            if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, 
session, flowFile)) {
+                continue;
+            }
+
+            if (!isDataValid(flowFile, jsonDocument)) {
+                flowFile = session.putAttribute(flowFile, 
AWS_DYNAMO_DB_ITEM_SIZE_ERROR, "Max size of item + attribute should be 400kb 
but was " + flowFile.getSize() + jsonDocument.length());
+                session.transfer(flowFile, REL_FAILURE);
+                continue;
+            }
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            session.exportTo(flowFile, baos);
+
+            try {
+                if (rangeKeyValue == null || 
StringUtils.isBlank(rangeKeyValue.toString())) {
+                    tableWriteItems.addItemToPut(new 
Item().withKeyComponent(hashKeyName, hashKeyValue)
+                        .withJSON(jsonDocument, 
IOUtils.toString(baos.toByteArray(), charset)));
+                } else {
+                    tableWriteItems.addItemToPut(new 
Item().withKeyComponent(hashKeyName, hashKeyValue)
+                        .withKeyComponent(rangeKeyName, rangeKeyValue)
+                        .withJSON(jsonDocument, 
IOUtils.toString(baos.toByteArray(), charset)));
+                }
+            } catch (IOException ioe) {
+                getLogger().error("IOException while creating put item : " + 
ioe.getMessage());
+                flowFile = session.putAttribute(flowFile, 
DYNAMODB_ITEM_IO_ERROR, ioe.getMessage());
+                session.transfer(flowFile, REL_FAILURE);
+            }
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), 
flowFile);
+        }
+
+        if (keysToFlowFileMap.isEmpty()) {
+            return;
+        }
+
+        final DynamoDB dynamoDB = getDynamoDB();
+
+        try {
+            BatchWriteItemOutcome outcome = 
dynamoDB.batchWriteItem(tableWriteItems);
+
+            handleUnprocessedItems(session, keysToFlowFileMap, table, 
hashKeyName, hashKeyValueType, rangeKeyName,
+                rangeKeyValueType, outcome);
+
+            // Handle any remaining flowfiles
+            for (FlowFile flowFile : keysToFlowFileMap.values()) {
+                getLogger().debug("Successful posted items to dynamodb : " + 
table);
+                session.transfer(flowFile, REL_SUCCESS);
+            }
+        } catch (AmazonServiceException exception) {
+            getLogger().error("Could not process flowFiles due to service 
exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processServiceException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch (AmazonClientException exception) {
+            getLogger().error("Could not process flowFiles due to client 
exception : " + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processClientException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        } catch (Exception exception) {
+            getLogger().error("Could not process flowFiles due to exception : 
" + exception.getMessage());
+            List<FlowFile> failedFlowFiles = processException(session, 
flowFiles, exception);
+            session.transfer(failedFlowFiles, REL_FAILURE);
+        }
+    }
+
+    private boolean isDataValid(FlowFile flowFile, String jsonDocument) {
+        return (flowFile.getSize() + jsonDocument.length()) < 
DYNAMODB_MAX_ITEM_SIZE;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    protected Map<String, AttributeValue> getRequestItem(WriteRequest 
writeRequest) {
+        return writeRequest.getPutRequest().getItem();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 2d5460f..9eb1c7b 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,5 +19,8 @@ org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS
 org.apache.nifi.processors.aws.sqs.DeleteSQS
-org.apache.nifi.processors.aws.lambda.PutLambda
+org.apache.nifi.processors.aws.lambda.PutLambda
 org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
+org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
+org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
+org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB

http://git-wip-us.apache.org/repos/asf/nifi/blob/0f115a5c/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
new file mode 100644
index 0000000..fa0e605
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDBTest.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.DeleteRequest;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+public class DeleteDynamoDBTest {
+
+    protected DeleteDynamoDB deleteDynamoDB;
+    protected BatchWriteItemResult result = new BatchWriteItemResult();
+    BatchWriteItemOutcome outcome;
+
+    @Before
+    public void setUp() {
+        outcome = new BatchWriteItemOutcome(result);
+        result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                return outcome;
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteOnlyHashFailure() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteSuccessfulWithMock() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS,
 1);
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangeDeleteSuccessfulWithMockOneUnprocessed() {
+        Map<String, List<WriteRequest>> unprocessed =
+                new HashMap<String, List<WriteRequest>>();
+        DeleteRequest delete = new DeleteRequest();
+        delete.addKeyEntry("hashS", new AttributeValue("h1"));
+        delete.addKeyEntry("rangeS", new AttributeValue("r1"));
+        WriteRequest write = new WriteRequest(delete);
+        List<WriteRequest> writes = new ArrayList<>();
+        writes.add(write);
+        unprocessed.put(stringHashStringRangeTableName, writes);
+        result.setUnprocessedItems(unprocessed);
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED,
 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteNoHashValueFailure() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangeDeleteOnlyHashWithRangeValueNoRangeNameFailure() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangeDeleteOnlyHashWithRangeNameNoRangeValueFailure() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteNonExistentHashSuccess() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"nonexistent");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS,
 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteNonExistentRangeSuccess() {
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"nonexistent");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS,
 1);
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteThrowsServiceException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                throw new AmazonServiceException("serviceException");
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("serviceException (Service: null; Status Code: 0; 
Error Code: null; Request ID: null)", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteThrowsClientException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                throw new AmazonClientException("clientException");
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("clientException", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeDeleteThrowsRuntimeException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                throw new RuntimeException("runtimeException");
+            }
+        };
+
+        deleteDynamoDB = new DeleteDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(deleteDynamoDB);
+
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        deleteRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        
deleteRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
 1);
+        List<MockFlowFile> flowFiles = 
deleteRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("runtimeException", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+}

Reply via email to