Repository: nifi
Updated Branches:
  refs/heads/master 41583e6dc -> e3155a8a4


http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
new file mode 100644
index 0000000..e360c84
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult;
+import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+import com.amazonaws.util.json.JSONException;
+import com.amazonaws.util.json.JSONObject;
+
+public class GetDynamoDBTest {
+    protected GetDynamoDB getDynamoDB;
+    protected BatchGetItemOutcome outcome;
+    protected BatchGetItemResult result = new BatchGetItemResult();
+    private HashMap unprocessed;
+
+    @Before
+    public void setUp() {
+        outcome = new BatchGetItemOutcome(result);
+        KeysAndAttributes kaa = new KeysAndAttributes();
+        Map<String,AttributeValue> map = new HashMap<>();
+        map.put("hashS", new AttributeValue("h1"));
+        map.put("rangeS", new AttributeValue("r1"));
+        kaa.withKeys(map);
+        unprocessed = new HashMap<>();
+        unprocessed.put(stringHashStringRangeTableName, kaa);
+
+        result.withUnprocessedKeys(unprocessed);
+
+        Map<String,List<Map<String,AttributeValue>>> responses = new 
HashMap<>();
+        List<Map<String,AttributeValue>> items = new ArrayList<>();
+        responses.put("StringHashStringRangeTable", items);
+        result.withResponses(responses);
+
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                return outcome;
+            }
+
+        };
+
+        getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetUnprocessed() {
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED,
 1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetJsonObjectNull() {
+        outcome = new BatchGetItemOutcome(result);
+        KeysAndAttributes kaa = new KeysAndAttributes();
+        Map<String,AttributeValue> map = new HashMap<>();
+        map.put("hashS", new AttributeValue("h1"));
+        map.put("rangeS", new AttributeValue("r1"));
+        kaa.withKeys(map);
+        unprocessed = new HashMap<>();
+        result.withUnprocessedKeys(unprocessed);
+
+        Map<String,List<Map<String,AttributeValue>>> responses = new 
HashMap<>();
+        List<Map<String,AttributeValue>> items = new ArrayList<>();
+        Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
+        item.put("j1",null);
+        item.put("hashS", new AttributeValue("h1"));
+        item.put("rangeS", new AttributeValue("r1"));
+        items.add(item);
+        responses.put("StringHashStringRangeTable", items);
+        result.withResponses(responses);
+
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                return outcome;
+            }
+
+        };
+
+        getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertNull(flowFile.getContentClaim());
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetJsonObjectValid() throws 
IOException, JSONException {
+        outcome = new BatchGetItemOutcome(result);
+        KeysAndAttributes kaa = new KeysAndAttributes();
+        Map<String,AttributeValue> map = new HashMap<>();
+        map.put("hashS", new AttributeValue("h1"));
+        map.put("rangeS", new AttributeValue("r1"));
+        kaa.withKeys(map);
+        unprocessed = new HashMap<>();
+        result.withUnprocessedKeys(unprocessed);
+
+        Map<String,List<Map<String,AttributeValue>>> responses = new 
HashMap<>();
+        List<Map<String,AttributeValue>> items = new ArrayList<>();
+        Map<String,AttributeValue> item = new HashMap<String,AttributeValue>();
+        String jsonDocument = new JSONObject().put("name", "john").toString();
+        item.put("j1",new AttributeValue(jsonDocument));
+        item.put("hashS", new AttributeValue("h1"));
+        item.put("rangeS", new AttributeValue("r1"));
+        items.add(item);
+        responses.put("StringHashStringRangeTable", items);
+        result.withResponses(responses);
+
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                return outcome;
+            }
+
+        };
+
+        getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
+    }
+
+    @Test
+    public void testStringHashStringRangeGetThrowsServiceException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                throw new AmazonServiceException("serviceException");
+            }
+
+        };
+
+        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("serviceException (Service: null; Status Code: 0; 
Error Code: null; Request ID: null)", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetThrowsRuntimeException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                throw new RuntimeException("runtimeException");
+            }
+
+        };
+
+        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("runtimeException", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetThrowsClientException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                throw new AmazonClientException("clientException");
+            }
+
+        };
+
+        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("clientException", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetNotFound() {
+        result.clearResponsesEntries();
+        result.clearUnprocessedKeysEntries();
+
+        final BatchGetItemOutcome notFoundOutcome = new 
BatchGetItemOutcome(result);
+        Map<String,List<Map<String,AttributeValue>>> responses = new 
HashMap<>();
+        List<Map<String,AttributeValue>> items = new ArrayList<>();
+        responses.put(stringHashStringRangeTableName, items);
+        result.withResponses(responses);
+
+        final DynamoDB notFoundMockDynamoDB = new 
DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... 
tableKeysAndAttributes) {
+                return notFoundOutcome;
+            }
+        };
+
+        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return notFoundMockDynamoDB;
+            }
+        };
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetOnlyHashFailure() {
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangeGetNoHashValueFailure() {
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangeGetOnlyHashWithRangeValueNoRangeNameFailure() {
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangeGetOnlyHashWithRangeNameNoRangeValueFailure() {
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+    }
+
+    // Incorporated test from James W
+    @Test
+    public void testStringHashStringNoRangeGetUnprocessed() {
+        unprocessed.clear();
+        KeysAndAttributes kaa = new KeysAndAttributes();
+        Map<String,AttributeValue> map = new HashMap<>();
+        map.put("hashS", new AttributeValue("h1"));
+        kaa.withKeys(map);
+        unprocessed.put(stringHashStringRangeTableName, kaa);
+
+        final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED,
 1);
+
+        List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java
new file mode 100644
index 0000000..238e2e7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.FileInputStream;
+import java.util.ArrayList;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.DeleteTableResult;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+
+public class ITAbstractDynamoDBTest {
+
+    protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+    protected static DynamoDB dynamoDB;
+    protected static AmazonDynamoDBClient amazonDynamoDBClient;
+    protected static String stringHashStringRangeTableName = 
"StringHashStringRangeTable";
+    protected static String numberHashNumberRangeTableName = 
"NumberHashNumberRangeTable";
+    protected static String numberHashOnlyTableName = "NumberHashOnlyTable";
+    protected final static String REGION = "us-west-2";
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
+        final PropertiesCredentials credentials = new 
PropertiesCredentials(fis);
+        amazonDynamoDBClient = new AmazonDynamoDBClient(credentials);
+        dynamoDB = new DynamoDB(amazonDynamoDBClient);
+        amazonDynamoDBClient.setRegion(Region.getRegion(Regions.US_WEST_2));
+
+        ArrayList<AttributeDefinition> attributeDefinitions = new 
ArrayList<AttributeDefinition>();
+        attributeDefinitions
+            .add(new 
AttributeDefinition().withAttributeName("hashS").withAttributeType("S"));
+        attributeDefinitions
+            .add(new 
AttributeDefinition().withAttributeName("rangeS").withAttributeType("S"));
+
+        ArrayList<KeySchemaElement> keySchema = new 
ArrayList<KeySchemaElement>();
+        keySchema.add(new 
KeySchemaElement().withAttributeName("hashS").withKeyType(KeyType.HASH));
+        keySchema.add(new 
KeySchemaElement().withAttributeName("rangeS").withKeyType(KeyType.RANGE));
+
+        CreateTableRequest request = new CreateTableRequest()
+            .withTableName(stringHashStringRangeTableName)
+            .withKeySchema(keySchema)
+            .withAttributeDefinitions(attributeDefinitions)
+            .withProvisionedThroughput(new ProvisionedThroughput()
+                .withReadCapacityUnits(5L)
+                .withWriteCapacityUnits(6L));
+        Table stringHashStringRangeTable = dynamoDB.createTable(request);
+        stringHashStringRangeTable.waitForActive();
+
+        attributeDefinitions = new ArrayList<AttributeDefinition>();
+        attributeDefinitions
+            .add(new 
AttributeDefinition().withAttributeName("hashN").withAttributeType("N"));
+        attributeDefinitions
+            .add(new 
AttributeDefinition().withAttributeName("rangeN").withAttributeType("N"));
+
+        keySchema = new ArrayList<KeySchemaElement>();
+        keySchema.add(new 
KeySchemaElement().withAttributeName("hashN").withKeyType(KeyType.HASH));
+        keySchema.add(new 
KeySchemaElement().withAttributeName("rangeN").withKeyType(KeyType.RANGE));
+
+        request = new CreateTableRequest()
+            .withTableName(numberHashNumberRangeTableName)
+            .withKeySchema(keySchema)
+            .withAttributeDefinitions(attributeDefinitions)
+            .withProvisionedThroughput(new ProvisionedThroughput()
+                .withReadCapacityUnits(5L)
+                .withWriteCapacityUnits(6L));
+        Table numberHashNumberRangeTable = dynamoDB.createTable(request);
+        numberHashNumberRangeTable.waitForActive();
+
+        attributeDefinitions = new ArrayList<AttributeDefinition>();
+        attributeDefinitions
+            .add(new 
AttributeDefinition().withAttributeName("hashN").withAttributeType("N"));
+
+        keySchema = new ArrayList<KeySchemaElement>();
+        keySchema.add(new 
KeySchemaElement().withAttributeName("hashN").withKeyType(KeyType.HASH));
+
+        request = new CreateTableRequest()
+            .withTableName(numberHashOnlyTableName)
+            .withKeySchema(keySchema)
+            .withAttributeDefinitions(attributeDefinitions)
+            .withProvisionedThroughput(new ProvisionedThroughput()
+                .withReadCapacityUnits(5L)
+                .withWriteCapacityUnits(6L));
+        Table numberHashOnlyTable = dynamoDB.createTable(request);
+        numberHashOnlyTable.waitForActive();
+
+    }
+
+    protected static void validateServiceExceptionAttribute(FlowFile flowFile) 
{
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE));
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        DeleteTableResult result = 
amazonDynamoDBClient.deleteTable(stringHashStringRangeTableName);
+        result = 
amazonDynamoDBClient.deleteTable(numberHashNumberRangeTableName);
+        result = amazonDynamoDBClient.deleteTable(numberHashOnlyTableName);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
new file mode 100644
index 0000000..bbd341c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class ITPutGetDeleteGetDynamoDBTest extends ITAbstractDynamoDBTest {
+
+
+    @Test
+    public void testStringHashStringRangePutGetDeleteGetSuccess() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
+
+        flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
+        deleteRunner.setProperty(DeleteDynamoDB.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_NAME, "rangeS");
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "h1");
+        deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_VALUE, "r1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 
1);
+
+        flowFiles = 
deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals("", new String(flowFile.toByteArray()));
+        }
+
+        // Final check after delete
+        final TestRunner getRunnerAfterDelete = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, 
REGION);
+        getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunnerAfterDelete.enqueue(new byte[] {});
+
+        getRunnerAfterDelete.run(1);
+        
getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 
1);
+
+        flowFiles = 
getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND);
+        for (MockFlowFile flowFile : flowFiles) {
+            String error = 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND);
+            
assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutDeleteWithHashOnlyFailure() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
+
+        flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
+        deleteRunner.setProperty(DeleteDynamoDB.TABLE, 
stringHashStringRangeTableName);
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashS");
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "h1");
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_FAILURE, 
1);
+
+        flowFiles = 
deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            validateServiceExceptionAttribute(flowFile);
+            assertEquals("", new String(flowFile.toByteArray()));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutGetWithHashOnlyKeyFailure() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, 
"h1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, 
"r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            validateServiceExceptionAttribute(flowFile);
+            assertEquals("", new String(flowFile.toByteArray()));
+        }
+
+
+    }
+
+    @Test
+    public void testNumberHashOnlyPutGetDeleteGetSuccess() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, 
numberHashOnlyTableName);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, 
"hashN");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, 
"40");
+        
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"age\":40}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
numberHashOnlyTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashN");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
+
+        flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
+        deleteRunner.setProperty(DeleteDynamoDB.TABLE, 
numberHashOnlyTableName);
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashN");
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "40");
+        
deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 
1);
+
+        flowFiles = 
deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals("", new String(flowFile.toByteArray()));
+        }
+
+        // Final check after delete
+        final TestRunner getRunnerAfterDelete = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, 
REGION);
+        getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, 
numberHashOnlyTableName);
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashN");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"40");
+        
getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE,
 AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunnerAfterDelete.enqueue(new byte[] {});
+
+        getRunnerAfterDelete.run(1);
+        
getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 
1);
+
+        flowFiles = 
getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND);
+        for (MockFlowFile flowFile : flowFiles) {
+            String error = 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND);
+            
assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testNumberHashNumberRangePutGetDeleteGetSuccess() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, 
numberHashNumberRangeTableName);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, 
"hashN");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeN");
+        
putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        
putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, 
"40");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, 
"50");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"40\":\"50\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner getRunner = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
numberHashNumberRangeTableName);
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashN");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeN");
+        getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40");
+        getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "50");
+        
getRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        
getRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        getRunner.enqueue(new byte[] {});
+
+        getRunner.run(1);
+
+        
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
+
+        flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+        final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
+
+        deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION);
+        deleteRunner.setProperty(DeleteDynamoDB.TABLE, 
numberHashNumberRangeTableName);
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashN");
+        deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_NAME, "rangeN");
+        deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "40");
+        deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_VALUE, "50");
+        
deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        
deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, 
AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        deleteRunner.enqueue(new byte[] {});
+
+        deleteRunner.run(1);
+
+        deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 
1);
+
+        flowFiles = 
deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals("", new String(flowFile.toByteArray()));
+        }
+
+        // Final check after delete
+        final TestRunner getRunnerAfterDelete = 
TestRunners.newTestRunner(GetDynamoDB.class);
+
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+        getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, 
REGION);
+        getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, 
numberHashNumberRangeTableName);
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashN");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeN");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, 
"40");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, 
"50");
+        
getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        
getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE,
 AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        
getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE,
 AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER);
+        getRunnerAfterDelete.enqueue(new byte[] {});
+
+        getRunnerAfterDelete.run(1);
+        
getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 
1);
+
+        flowFiles = 
getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND);
+        for (MockFlowFile flowFile : flowFiles) {
+            String error = 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND);
+            
assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java
new file mode 100644
index 0000000..62b970b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class ItemKeysTest {
+
+    @Test
+    public void testHashNullRangeNullEquals() {
+        ItemKeys ik1 = new ItemKeys(null, null);
+        ItemKeys ik2 = new ItemKeys(null, null);
+        assertEquals(ik1, ik2);
+        assertEquals(ik1.hashCode(), ik2.hashCode());
+        assertEquals(ik1.toString(), ik2.toString());
+    }
+
+    @Test
+    public void testHashNotNullRangeNullEquals() {
+        ItemKeys ik1 = new ItemKeys("abc", null);
+        ItemKeys ik2 = new ItemKeys("abc", null);
+        assertEquals(ik1, ik2);
+        assertEquals(ik1.hashCode(), ik2.hashCode());
+        assertEquals(ik1.toString(), ik2.toString());
+    }
+
+    @Test
+    public void testHashNullRangeNotNullEquals() {
+        ItemKeys ik1 = new ItemKeys(null, "ab");
+        ItemKeys ik2 = new ItemKeys(null, "ab");
+        assertEquals(ik1, ik2);
+        assertEquals(ik1.hashCode(), ik2.hashCode());
+        assertEquals(ik1.toString(), ik2.toString());
+    }
+
+    @Test
+    public void testHashNotNullRangeNotNullEquals() {
+        ItemKeys ik1 = new ItemKeys("abc", "pqr");
+        ItemKeys ik2 = new ItemKeys("abc", "pqr");
+        assertEquals(ik1, ik2);
+        assertEquals(ik1.hashCode(), ik2.hashCode());
+        assertEquals(ik1.toString(), ik2.toString());
+    }
+
+    @Test
+    public void testHashNotNullRangeNotNullForOtherNotEquals() {
+        ItemKeys ik1 = new ItemKeys(null, "ab");
+        ItemKeys ik2 = new ItemKeys("ab", null);
+        assertFalse(ik1.equals(ik2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java
new file mode 100644
index 0000000..39b5609
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.dynamodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult;
+import com.amazonaws.services.dynamodbv2.model.PutRequest;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+
+public class PutDynamoDBTest  {
+
+    protected PutDynamoDB putDynamoDB;
+    protected BatchWriteItemResult result = new BatchWriteItemResult();
+    BatchWriteItemOutcome outcome;
+
+    @Before
+    public void setUp() {
+        outcome = new BatchWriteItemOutcome(result);
+        result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>());
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                return outcome;
+            }
+        };
+
+        putDynamoDB = new PutDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutOnlyHashFailure() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"hello\": 2}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile);
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutNoHashValueFailure() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"hello\": 2}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangePutOnlyHashWithRangeValueNoRangeNameFailure() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        putRunner.enqueue(new byte[] {});
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangePutOnlyHashWithRangeNameNoRangeValueFailure() {
+        final TestRunner putRunner = 
TestRunners.newTestRunner(PutDynamoDB.class);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1");
+        putRunner.enqueue(new byte[] {});
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR));
+        }
+    }
+
+    @Test
+    public void testStringHashStringRangePutSuccessfulWithMock() {
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+
+    }
+
+    @Test
+    public void 
testStringHashStringRangePutOneSuccessfulOneSizeFailureWithMockBatchSize1() {
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1];
+        for (int i = 0; i < item.length; i++) {
+            item[i] = 'a';
+        }
+        String document2 = new String(item);
+        putRunner.enqueue(document2.getBytes());
+
+        putRunner.run(2,true,true);
+
+        List<MockFlowFile> flowFilesFailed = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFilesFailed) {
+            System.out.println(flowFile.getAttributes());
+            
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
+            assertEquals(item.length,flowFile.getSize());
+        }
+
+        List<MockFlowFile> flowFilesSuccessful = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesSuccessful) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+    }
+
+    @Test
+    public void 
testStringHashStringRangePutOneSuccessfulOneSizeFailureWithMockBatchSize5() {
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+        putRunner.setProperty(AbstractDynamoDBProcessor.BATCH_SIZE, "5");
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1];
+        for (int i = 0; i < item.length; i++) {
+            item[i] = 'a';
+        }
+        String document2 = new String(item);
+        putRunner.enqueue(document2.getBytes());
+
+        putRunner.run(1);
+
+        List<MockFlowFile> flowFilesFailed = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFilesFailed) {
+            System.out.println(flowFile.getAttributes());
+            
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
+            assertEquals(item.length,flowFile.getSize());
+        }
+
+        List<MockFlowFile> flowFilesSuccessful = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesSuccessful) {
+            System.out.println(flowFile.getAttributes());
+            assertEquals(document, new String(flowFile.toByteArray()));
+        }
+    }
+
+    @Test
+    public void 
testStringHashStringRangePutFailedWithItemSizeGreaterThan400Kb() {
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1];
+        for (int i = 0; i < item.length; i++) {
+            item[i] = 'a';
+        }
+        String document = new String(item);
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_FAILURE,
 1);
+
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE);
+        assertEquals(1,flowFiles.size());
+        for (MockFlowFile flowFile : flowFiles) {
+            System.out.println(flowFile.getAttributes());
+            
flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR);
+            assertEquals(item.length,flowFile.getSize());
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutThrowsServiceException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                throw new AmazonServiceException("serviceException");
+            }
+        };
+
+        putDynamoDB = new PutDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("serviceException (Service: null; Status Code: 0; 
Error Code: null; Request ID: null)", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutThrowsClientException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                throw new AmazonClientException("clientException");
+            }
+        };
+
+        putDynamoDB = new PutDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("clientException", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutThrowsRuntimeException() {
+        final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) {
+            @Override
+            public BatchWriteItemOutcome batchWriteItem(TableWriteItems... 
tableWriteItems) {
+                throw new RuntimeException("runtimeException");
+            }
+        };
+
+        putDynamoDB = new PutDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+        };
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, 
"document");
+        String document = "{\"name\":\"john\"}";
+        putRunner.enqueue(document.getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
+        List<MockFlowFile> flowFiles = 
putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
+        for (MockFlowFile flowFile : flowFiles) {
+            assertEquals("runtimeException", 
flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE));
+        }
+
+    }
+
+    @Test
+    public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() 
{
+        Map<String, List<WriteRequest>> unprocessed =
+                new HashMap<String, List<WriteRequest>>();
+        PutRequest put = new PutRequest();
+        put.addItemEntry("hashS", new AttributeValue("h1"));
+        put.addItemEntry("rangeS", new AttributeValue("r1"));
+        WriteRequest write = new WriteRequest(put);
+        List<WriteRequest> writes = new ArrayList<>();
+        writes.add(write);
+        unprocessed.put(stringHashStringRangeTableName, writes);
+        result.setUnprocessedItems(unprocessed);
+        final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB);
+
+        putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
+        putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef");
+        putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION);
+        putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, 
stringHashStringRangeTableName);
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, 
"hashS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, 
"rangeS");
+        putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1");
+        putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2");
+        putRunner.enqueue("{\"hello\":\"world\"}".getBytes());
+
+        putRunner.run(1);
+
+        
putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED,
 1);
+
+    }
+
+}

Reply via email to