ChrisSamo632 commented on a change in pull request #5504:
URL: https://github.com/apache/nifi/pull/5504#discussion_r762612478
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
##########
@@ -161,38 +198,90 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
// Handle unprocessed keys
- Map<String, KeysAndAttributes> unprocessedKeys =
result.getUnprocessedKeys();
+ final Map<String, KeysAndAttributes> unprocessedKeys =
result.getUnprocessedKeys();
if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
- KeysAndAttributes keysAndAttributes =
unprocessedKeys.get(table);
- List<Map<String, AttributeValue>> keys =
keysAndAttributes.getKeys();
+ final KeysAndAttributes keysAndAttributes =
unprocessedKeys.get(table);
+ final List<Map<String, AttributeValue>> keys =
keysAndAttributes.getKeys();
- for (Map<String,AttributeValue> unprocessedKey : keys) {
- Object hashKeyValue = getAttributeValue(context,
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
- Object rangeKeyValue = getAttributeValue(context,
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
+ for (final Map<String,AttributeValue> unprocessedKey : keys) {
+ final Object hashKeyValue = getAttributeValue(context,
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
+ final Object rangeKeyValue = getAttributeValue(context,
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
sendUnprocessedToUnprocessedRelationship(session,
keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
// Handle any remaining items
- for (ItemKeys key : keysToFlowFileMap.keySet()) {
+ for (final ItemKeys key : keysToFlowFileMap.keySet()) {
FlowFile flowFile = keysToFlowFileMap.get(key);
flowFile = session.putAttribute(flowFile,
DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE +
key.toString() );
session.transfer(flowFile,REL_NOT_FOUND);
keysToFlowFileMap.remove(key);
}
- } catch(AmazonServiceException exception) {
+ } catch(final AmazonServiceException exception) {
getLogger().error("Could not process flowFiles due to service
exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session,
flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
- } catch(AmazonClientException exception) {
+ } catch(final AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client
exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session,
flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
- } catch(Exception exception) {
+ } catch(final Exception exception) {
getLogger().error("Could not process flowFiles due to exception :
" + exception.getMessage());
List<FlowFile> failedFlowFiles = processException(session,
flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
}
}
+
+ private Map<ItemKeys, FlowFile> getKeysToFlowFileMap(final ProcessContext
context, final ProcessSession session, final List<FlowFile> flowFiles) {
+ Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
Review comment:
`final`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]