Repository: nifi Updated Branches: refs/heads/master 41583e6dc -> e3155a8a4
http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java new file mode 100644 index 0000000..e360c84 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult; +import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes; +import com.amazonaws.util.json.JSONException; +import com.amazonaws.util.json.JSONObject; + +public class GetDynamoDBTest { + protected GetDynamoDB getDynamoDB; + protected BatchGetItemOutcome outcome; + protected BatchGetItemResult result = new BatchGetItemResult(); + private HashMap unprocessed; + + @Before + public void setUp() { + outcome = new BatchGetItemOutcome(result); + KeysAndAttributes kaa = new KeysAndAttributes(); + Map<String,AttributeValue> map = new HashMap<>(); + map.put("hashS", new AttributeValue("h1")); + map.put("rangeS", new AttributeValue("r1")); + kaa.withKeys(map); + unprocessed = new HashMap<>(); + unprocessed.put(stringHashStringRangeTableName, kaa); + + result.withUnprocessedKeys(unprocessed); + + Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>(); + List<Map<String,AttributeValue>> items = new ArrayList<>(); + responses.put("StringHashStringRangeTable", items); + result.withResponses(responses); + + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + return outcome; + } + + }; + + getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + + } + + @Test + public void testStringHashStringRangeGetUnprocessed() { + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED)); + } + + } + + @Test + public void testStringHashStringRangeGetJsonObjectNull() { + outcome = new BatchGetItemOutcome(result); + KeysAndAttributes kaa = new KeysAndAttributes(); + Map<String,AttributeValue> map = new HashMap<>(); + map.put("hashS", new AttributeValue("h1")); + map.put("rangeS", new AttributeValue("r1")); + kaa.withKeys(map); + unprocessed = new HashMap<>(); + result.withUnprocessedKeys(unprocessed); + + Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>(); + List<Map<String,AttributeValue>> items = new ArrayList<>(); + Map<String,AttributeValue> item = new HashMap<String,AttributeValue>(); + item.put("j1",null); + item.put("hashS", new AttributeValue("h1")); + item.put("rangeS", new AttributeValue("r1")); + items.add(item); + responses.put("StringHashStringRangeTable", items); + result.withResponses(responses); + + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + return outcome; + } + + }; + + getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + assertNull(flowFile.getContentClaim()); + } + + } + + @Test + public void testStringHashStringRangeGetJsonObjectValid() throws IOException, JSONException { + outcome = new BatchGetItemOutcome(result); + KeysAndAttributes kaa = new KeysAndAttributes(); + Map<String,AttributeValue> map = new HashMap<>(); + map.put("hashS", new AttributeValue("h1")); + map.put("rangeS", new AttributeValue("r1")); + kaa.withKeys(map); + unprocessed = new HashMap<>(); + result.withUnprocessedKeys(unprocessed); + + Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>(); + List<Map<String,AttributeValue>> items = new ArrayList<>(); + Map<String,AttributeValue> item = new HashMap<String,AttributeValue>(); + String jsonDocument = new JSONObject().put("name", "john").toString(); + item.put("j1",new AttributeValue(jsonDocument)); + item.put("hashS", new AttributeValue("h1")); + item.put("rangeS", new AttributeValue("r1")); + items.add(item); + responses.put("StringHashStringRangeTable", items); + result.withResponses(responses); + + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + return outcome; + } + + }; + + getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + } + + @Test + public void testStringHashStringRangeGetThrowsServiceException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + throw new AmazonServiceException("serviceException"); + } + + }; + + final GetDynamoDB getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangeGetThrowsRuntimeException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + throw new RuntimeException("runtimeException"); + } + + }; + + final GetDynamoDB getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangeGetThrowsClientException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + throw new AmazonClientException("clientException"); + } + + }; + + final GetDynamoDB getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangeGetNotFound() { + result.clearResponsesEntries(); + result.clearUnprocessedKeysEntries(); + + final BatchGetItemOutcome notFoundOutcome = new BatchGetItemOutcome(result); + Map<String,List<Map<String,AttributeValue>>> responses = new HashMap<>(); + List<Map<String,AttributeValue>> items = new ArrayList<>(); + responses.put(stringHashStringRangeTableName, items); + result.withResponses(responses); + + final DynamoDB notFoundMockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchGetItemOutcome batchGetItem(TableKeysAndAttributes... tableKeysAndAttributes) { + return notFoundOutcome; + } + }; + + final GetDynamoDB getDynamoDB = new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return notFoundMockDynamoDB; + } + }; + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND)); + } + + } + + @Test + public void testStringHashStringRangeGetOnlyHashFailure() { + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile); + } + + } + + @Test + public void testStringHashStringRangeGetNoHashValueFailure() { + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR)); + } + + } + + @Test + public void testStringHashStringRangeGetOnlyHashWithRangeValueNoRangeNameFailure() { + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR)); + } + + } + + @Test + public void testStringHashStringRangeGetOnlyHashWithRangeNameNoRangeValueFailure() { + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR)); + } + } + + // Incorporated test from James W + @Test + public void testStringHashStringNoRangeGetUnprocessed() { + unprocessed.clear(); + KeysAndAttributes kaa = new KeysAndAttributes(); + Map<String,AttributeValue> map = new HashMap<>(); + map.put("hashS", new AttributeValue("h1")); + kaa.withKeys(map); + unprocessed.put(stringHashStringRangeTableName, kaa); + + final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); + + getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + getRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1); + + List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED)); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java new file mode 100644 index 0000000..238e2e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITAbstractDynamoDBTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import static org.junit.Assert.assertNotNull; + +import java.io.FileInputStream; +import java.util.ArrayList; + +import org.apache.nifi.flowfile.FlowFile; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.DeleteTableResult; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; + +public class ITAbstractDynamoDBTest { + + protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + protected static DynamoDB dynamoDB; + protected static AmazonDynamoDBClient amazonDynamoDBClient; + protected static String stringHashStringRangeTableName = "StringHashStringRangeTable"; + protected static String numberHashNumberRangeTableName = "NumberHashNumberRangeTable"; + protected static String numberHashOnlyTableName = "NumberHashOnlyTable"; + protected final static String REGION = "us-west-2"; + + @BeforeClass + public static void beforeClass() throws Exception { + FileInputStream fis = new FileInputStream(CREDENTIALS_FILE); + final PropertiesCredentials credentials = new PropertiesCredentials(fis); + amazonDynamoDBClient = new AmazonDynamoDBClient(credentials); + dynamoDB = new DynamoDB(amazonDynamoDBClient); + amazonDynamoDBClient.setRegion(Region.getRegion(Regions.US_WEST_2)); + + ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); + attributeDefinitions + .add(new AttributeDefinition().withAttributeName("hashS").withAttributeType("S")); + attributeDefinitions + .add(new AttributeDefinition().withAttributeName("rangeS").withAttributeType("S")); + + ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); + keySchema.add(new KeySchemaElement().withAttributeName("hashS").withKeyType(KeyType.HASH)); + keySchema.add(new KeySchemaElement().withAttributeName("rangeS").withKeyType(KeyType.RANGE)); + + CreateTableRequest request = new CreateTableRequest() + .withTableName(stringHashStringRangeTableName) + .withKeySchema(keySchema) + .withAttributeDefinitions(attributeDefinitions) + .withProvisionedThroughput(new ProvisionedThroughput() + .withReadCapacityUnits(5L) + .withWriteCapacityUnits(6L)); + Table stringHashStringRangeTable = dynamoDB.createTable(request); + stringHashStringRangeTable.waitForActive(); + + attributeDefinitions = new ArrayList<AttributeDefinition>(); + attributeDefinitions + .add(new AttributeDefinition().withAttributeName("hashN").withAttributeType("N")); + attributeDefinitions + .add(new AttributeDefinition().withAttributeName("rangeN").withAttributeType("N")); + + keySchema = new ArrayList<KeySchemaElement>(); + keySchema.add(new KeySchemaElement().withAttributeName("hashN").withKeyType(KeyType.HASH)); + keySchema.add(new KeySchemaElement().withAttributeName("rangeN").withKeyType(KeyType.RANGE)); + + request = new CreateTableRequest() + .withTableName(numberHashNumberRangeTableName) + .withKeySchema(keySchema) + .withAttributeDefinitions(attributeDefinitions) + .withProvisionedThroughput(new ProvisionedThroughput() + .withReadCapacityUnits(5L) + .withWriteCapacityUnits(6L)); + Table numberHashNumberRangeTable = dynamoDB.createTable(request); + numberHashNumberRangeTable.waitForActive(); + + attributeDefinitions = new ArrayList<AttributeDefinition>(); + attributeDefinitions + .add(new AttributeDefinition().withAttributeName("hashN").withAttributeType("N")); + + keySchema = new ArrayList<KeySchemaElement>(); + keySchema.add(new KeySchemaElement().withAttributeName("hashN").withKeyType(KeyType.HASH)); + + request = new CreateTableRequest() + .withTableName(numberHashOnlyTableName) + .withKeySchema(keySchema) + .withAttributeDefinitions(attributeDefinitions) + .withProvisionedThroughput(new ProvisionedThroughput() + .withReadCapacityUnits(5L) + .withWriteCapacityUnits(6L)); + Table numberHashOnlyTable = dynamoDB.createTable(request); + numberHashOnlyTable.waitForActive(); + + } + + protected static void validateServiceExceptionAttribute(FlowFile flowFile) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_CODE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_MESSAGE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_TYPE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_SERVICE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_REQUEST_ID)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_STATUS_CODE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_RETRYABLE)); + } + + @AfterClass + public static void afterClass() { + DeleteTableResult result = amazonDynamoDBClient.deleteTable(stringHashStringRangeTableName); + result = amazonDynamoDBClient.deleteTable(numberHashNumberRangeTableName); + result = amazonDynamoDBClient.deleteTable(numberHashOnlyTableName); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java new file mode 100644 index 0000000..bbd341c --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import static org.junit.Assert.assertEquals; + +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class ITPutGetDeleteGetDynamoDBTest extends ITAbstractDynamoDBTest { + + + @Test + public void testStringHashStringRangePutGetDeleteGetSuccess() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE); + deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION); + deleteRunner.setProperty(DeleteDynamoDB.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_NAME, "rangeS"); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "h1"); + deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_VALUE, "r1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 1); + + flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals("", new String(flowFile.toByteArray())); + } + + // Final check after delete + final TestRunner getRunnerAfterDelete = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunnerAfterDelete.enqueue(new byte[] {}); + + getRunnerAfterDelete.run(1); + getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1); + + flowFiles = getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND); + for (MockFlowFile flowFile : flowFiles) { + String error = flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND); + assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangePutDeleteWithHashOnlyFailure() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE); + deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION); + deleteRunner.setProperty(DeleteDynamoDB.TABLE, stringHashStringRangeTableName); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashS"); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "h1"); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_FAILURE, 1); + + flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + validateServiceExceptionAttribute(flowFile); + assertEquals("", new String(flowFile.toByteArray())); + } + + } + + @Test + public void testStringHashStringRangePutGetWithHashOnlyKeyFailure() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + validateServiceExceptionAttribute(flowFile); + assertEquals("", new String(flowFile.toByteArray())); + } + + + } + + @Test + public void testNumberHashOnlyPutGetDeleteGetSuccess() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, numberHashOnlyTableName); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashN"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "40"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"age\":40}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashOnlyTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE); + deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION); + deleteRunner.setProperty(DeleteDynamoDB.TABLE, numberHashOnlyTableName); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashN"); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "40"); + deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 1); + + flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals("", new String(flowFile.toByteArray())); + } + + // Final check after delete + final TestRunner getRunnerAfterDelete = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashOnlyTableName); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40"); + getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunnerAfterDelete.enqueue(new byte[] {}); + + getRunnerAfterDelete.run(1); + getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1); + + flowFiles = getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND); + for (MockFlowFile flowFile : flowFiles) { + String error = flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND); + assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE)); + } + + } + + @Test + public void testNumberHashNumberRangePutGetDeleteGetSuccess() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractWriteDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.TABLE, numberHashNumberRangeTableName); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_NAME, "hashN"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_NAME, "rangeN"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE, "40"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE, "50"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"40\":\"50\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner getRunner = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunner.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunner.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashNumberRangeTableName); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeN"); + getRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40"); + getRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "50"); + getRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + getRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + getRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunner.enqueue(new byte[] {}); + + getRunner.run(1); + + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); + + flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + assertEquals(document, new String(flowFile.toByteArray())); + } + + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); + + deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE); + deleteRunner.setProperty(DeleteDynamoDB.REGION, REGION); + deleteRunner.setProperty(DeleteDynamoDB.TABLE, numberHashNumberRangeTableName); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_NAME, "hashN"); + deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_NAME, "rangeN"); + deleteRunner.setProperty(DeleteDynamoDB.HASH_KEY_VALUE, "40"); + deleteRunner.setProperty(DeleteDynamoDB.RANGE_KEY_VALUE, "50"); + deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + deleteRunner.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + deleteRunner.enqueue(new byte[] {}); + + deleteRunner.run(1); + + deleteRunner.assertAllFlowFilesTransferred(DeleteDynamoDB.REL_SUCCESS, 1); + + flowFiles = deleteRunner.getFlowFilesForRelationship(DeleteDynamoDB.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals("", new String(flowFile.toByteArray())); + } + + // Final check after delete + final TestRunner getRunnerAfterDelete = TestRunners.newTestRunner(GetDynamoDB.class); + + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.CREDENTIALS_FILE, CREDENTIALS_FILE); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.TABLE, numberHashNumberRangeTableName); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashN"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeN"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "40"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "50"); + getRunnerAfterDelete.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.HASH_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + getRunnerAfterDelete.setProperty(AbstractWriteDynamoDBProcessor.RANGE_KEY_VALUE_TYPE, AbstractWriteDynamoDBProcessor.ALLOWABLE_VALUE_NUMBER); + getRunnerAfterDelete.enqueue(new byte[] {}); + + getRunnerAfterDelete.run(1); + getRunnerAfterDelete.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1); + + flowFiles = getRunnerAfterDelete.getFlowFilesForRelationship(GetDynamoDB.REL_NOT_FOUND); + for (MockFlowFile flowFile : flowFiles) { + String error = flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND); + assertTrue(error.startsWith(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE)); + } + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java new file mode 100644 index 0000000..62b970b --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ItemKeysTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class ItemKeysTest { + + @Test + public void testHashNullRangeNullEquals() { + ItemKeys ik1 = new ItemKeys(null, null); + ItemKeys ik2 = new ItemKeys(null, null); + assertEquals(ik1, ik2); + assertEquals(ik1.hashCode(), ik2.hashCode()); + assertEquals(ik1.toString(), ik2.toString()); + } + + @Test + public void testHashNotNullRangeNullEquals() { + ItemKeys ik1 = new ItemKeys("abc", null); + ItemKeys ik2 = new ItemKeys("abc", null); + assertEquals(ik1, ik2); + assertEquals(ik1.hashCode(), ik2.hashCode()); + assertEquals(ik1.toString(), ik2.toString()); + } + + @Test + public void testHashNullRangeNotNullEquals() { + ItemKeys ik1 = new ItemKeys(null, "ab"); + ItemKeys ik2 = new ItemKeys(null, "ab"); + assertEquals(ik1, ik2); + assertEquals(ik1.hashCode(), ik2.hashCode()); + assertEquals(ik1.toString(), ik2.toString()); + } + + @Test + public void testHashNotNullRangeNotNullEquals() { + ItemKeys ik1 = new ItemKeys("abc", "pqr"); + ItemKeys ik2 = new ItemKeys("abc", "pqr"); + assertEquals(ik1, ik2); + assertEquals(ik1.hashCode(), ik2.hashCode()); + assertEquals(ik1.toString(), ik2.toString()); + } + + @Test + public void testHashNotNullRangeNotNullForOtherNotEquals() { + ItemKeys ik1 = new ItemKeys(null, "ab"); + ItemKeys ik2 = new ItemKeys("ab", null); + assertFalse(ik1.equals(ik2)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3155a8a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java new file mode 100644 index 0000000..39b5609 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDBTest.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.dynamodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.TableWriteItems; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; +import com.amazonaws.services.dynamodbv2.model.PutRequest; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; + +public class PutDynamoDBTest { + + protected PutDynamoDB putDynamoDB; + protected BatchWriteItemResult result = new BatchWriteItemResult(); + BatchWriteItemOutcome outcome; + + @Before + public void setUp() { + outcome = new BatchWriteItemOutcome(result); + result.setUnprocessedItems(new HashMap<String, List<WriteRequest>>()); + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + return outcome; + } + }; + + putDynamoDB = new PutDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + + } + + @Test + public void testStringHashStringRangePutOnlyHashFailure() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"hello\": 2}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + ITAbstractDynamoDBTest.validateServiceExceptionAttribute(flowFile); + } + + } + + @Test + public void testStringHashStringRangePutNoHashValueFailure() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"hello\": 2}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_HASH_KEY_VALUE_ERROR)); + } + + } + + @Test + public void testStringHashStringRangePutOnlyHashWithRangeValueNoRangeNameFailure() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "document"); + putRunner.enqueue(new byte[] {}); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR)); + } + + } + + @Test + public void testStringHashStringRangePutOnlyHashWithRangeNameNoRangeValueFailure() { + final TestRunner putRunner = TestRunners.newTestRunner(PutDynamoDB.class); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j1"); + putRunner.enqueue(new byte[] {}); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_RANGE_KEY_VALUE_ERROR)); + } + } + + @Test + public void testStringHashStringRangePutSuccessfulWithMock() { + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_SUCCESS, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + + } + + @Test + public void testStringHashStringRangePutOneSuccessfulOneSizeFailureWithMockBatchSize1() { + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1]; + for (int i = 0; i < item.length; i++) { + item[i] = 'a'; + } + String document2 = new String(item); + putRunner.enqueue(document2.getBytes()); + + putRunner.run(2,true,true); + + List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFilesFailed) { + System.out.println(flowFile.getAttributes()); + flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR); + assertEquals(item.length,flowFile.getSize()); + } + + List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesSuccessful) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + } + + @Test + public void testStringHashStringRangePutOneSuccessfulOneSizeFailureWithMockBatchSize5() { + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + putRunner.setProperty(AbstractDynamoDBProcessor.BATCH_SIZE, "5"); + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1]; + for (int i = 0; i < item.length; i++) { + item[i] = 'a'; + } + String document2 = new String(item); + putRunner.enqueue(document2.getBytes()); + + putRunner.run(1); + + List<MockFlowFile> flowFilesFailed = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFilesFailed) { + System.out.println(flowFile.getAttributes()); + flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR); + assertEquals(item.length,flowFile.getSize()); + } + + List<MockFlowFile> flowFilesSuccessful = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesSuccessful) { + System.out.println(flowFile.getAttributes()); + assertEquals(document, new String(flowFile.toByteArray())); + } + } + + @Test + public void testStringHashStringRangePutFailedWithItemSizeGreaterThan400Kb() { + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + byte [] item = new byte[PutDynamoDB.DYNAMODB_MAX_ITEM_SIZE + 1]; + for (int i = 0; i < item.length; i++) { + item[i] = 'a'; + } + String document = new String(item); + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractWriteDynamoDBProcessor.REL_FAILURE, 1); + + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractWriteDynamoDBProcessor.REL_FAILURE); + assertEquals(1,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + flowFile.assertAttributeExists(PutDynamoDB.AWS_DYNAMO_DB_ITEM_SIZE_ERROR); + assertEquals(item.length,flowFile.getSize()); + } + + } + + @Test + public void testStringHashStringRangePutThrowsServiceException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + throw new AmazonServiceException("serviceException"); + } + }; + + putDynamoDB = new PutDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("serviceException (Service: null; Status Code: 0; Error Code: null; Request ID: null)", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangePutThrowsClientException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + throw new AmazonClientException("clientException"); + } + }; + + putDynamoDB = new PutDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("clientException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangePutThrowsRuntimeException() { + final DynamoDB mockDynamoDB = new DynamoDB(Regions.AP_NORTHEAST_1) { + @Override + public BatchWriteItemOutcome batchWriteItem(TableWriteItems... tableWriteItems) { + throw new RuntimeException("runtimeException"); + } + }; + + putDynamoDB = new PutDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + }; + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractWriteDynamoDBProcessor.JSON_DOCUMENT, "document"); + String document = "{\"name\":\"john\"}"; + putRunner.enqueue(document.getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); + List<MockFlowFile> flowFiles = putRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); + for (MockFlowFile flowFile : flowFiles) { + assertEquals("runtimeException", flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_ERROR_EXCEPTION_MESSAGE)); + } + + } + + @Test + public void testStringHashStringRangePutSuccessfulWithMockOneUnprocessed() { + Map<String, List<WriteRequest>> unprocessed = + new HashMap<String, List<WriteRequest>>(); + PutRequest put = new PutRequest(); + put.addItemEntry("hashS", new AttributeValue("h1")); + put.addItemEntry("rangeS", new AttributeValue("r1")); + WriteRequest write = new WriteRequest(put); + List<WriteRequest> writes = new ArrayList<>(); + writes.add(write); + unprocessed.put(stringHashStringRangeTableName, writes); + result.setUnprocessedItems(unprocessed); + final TestRunner putRunner = TestRunners.newTestRunner(putDynamoDB); + + putRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); + putRunner.setProperty(AbstractDynamoDBProcessor.SECRET_KEY, "cdef"); + putRunner.setProperty(AbstractDynamoDBProcessor.REGION, REGION); + putRunner.setProperty(AbstractDynamoDBProcessor.TABLE, stringHashStringRangeTableName); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_NAME, "hashS"); + putRunner.setProperty(AbstractDynamoDBProcessor.HASH_KEY_VALUE, "h1"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_NAME, "rangeS"); + putRunner.setProperty(AbstractDynamoDBProcessor.RANGE_KEY_VALUE, "r1"); + putRunner.setProperty(AbstractDynamoDBProcessor.JSON_DOCUMENT, "j2"); + putRunner.enqueue("{\"hello\":\"world\"}".getBytes()); + + putRunner.run(1); + + putRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1); + + } + +}
