ChrisSamo632 commented on a change in pull request #5504:
URL: https://github.com/apache/nifi/pull/5504#discussion_r762613901
##########
File path:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
##########
@@ -100,42 +104,75 @@
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- List<FlowFile> flowFiles =
session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
- if (flowFiles == null || flowFiles.size() == 0) {
- return;
- }
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, verificationLogger, attributes));
- Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+ final TableKeysAndAttributes tableKeysAndAttributes =
getTableKeysAndAttributes(context, attributes);
final String table =
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
- TableKeysAndAttributes tableKeysAndAttributes = new
TableKeysAndAttributes(table);
-
- final String hashKeyName =
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
- final String rangeKeyName =
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
final String jsonDocument =
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
- for (FlowFile flowFile : flowFiles) {
- final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile);
- final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+ if (tableKeysAndAttributes.getPrimaryKeys().isEmpty()) {
- if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue,
session, flowFile)) {
- continue;
- }
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SKIPPED)
+ .verificationStepName("Get DynamoDB Items")
+ .explanation(String.format("Skipped getting DynamoDB items
because no primary keys would be included in retrieval"))
+ .build());
+ } else {
+ try {
+ final DynamoDB dynamoDB =
getDynamoDB(getConfiguration(context).getClient());
+ int totalCount = 0;
+ int jsonDocumentCount = 0;
- if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue,
session, flowFile) ) {
- continue;
- }
+ BatchGetItemOutcome result =
dynamoDB.batchGetItem(tableKeysAndAttributes);
Review comment:
Would it be better to do some sort of "DynamoDB Table Exists" rather
than fetching data - this could potentially be costly if a lot of data is
pulled back over the internet?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]