This is an automated email from the ASF dual-hosted git repository.
chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f07c372 NIFI-9353: Adding Config Verification to AWS Processors
f07c372 is described below
commit f07c37285ebfb6af33fa8a5906bd605890e66908
Author: Joe Gresock <[email protected]>
AuthorDate: Sun Oct 31 07:05:08 2021 -0400
NIFI-9353: Adding Config Verification to AWS Processors
This closes #5504
---
.../AbstractAWSCredentialsProviderProcessor.java | 64 +++---
.../nifi/processors/aws/AbstractAWSProcessor.java | 54 ++++-
.../aws/dynamodb/AbstractDynamoDBProcessor.java | 65 +++---
.../aws/wag/AbstractAWSGatewayApiProcessor.java | 109 +++++------
.../processors/aws/dynamodb/DeleteDynamoDB.java | 4 +-
.../nifi/processors/aws/dynamodb/GetDynamoDB.java | 218 +++++++++++++++------
.../nifi/processors/aws/dynamodb/PutDynamoDB.java | 4 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 163 ++++++++++-----
.../org/apache/nifi/processors/aws/s3/ListS3.java | 11 +-
.../processors/aws/wag/InvokeAWSGatewayApi.java | 156 ++++++++++-----
.../processors/aws/dynamodb/GetDynamoDBTest.java | 156 ++++++++-------
.../dynamodb/ITPutGetDeleteGetDynamoDBTest.java | 7 +
.../nifi/processors/aws/s3/TestFetchS3Object.java | 36 +++-
.../apache/nifi/processors/aws/s3/TestListS3.java | 4 +-
.../aws/wag/TestInvokeAmazonGatewayApiMock.java | 17 +-
15 files changed, 706 insertions(+), 362 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
index 923878c..8b2c560 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -15,16 +15,23 @@
* limitations under the License.
*/
package org.apache.nifi.processors.aws;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
import
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
-import com.amazonaws.AmazonWebServiceClient;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentialsProvider;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
/**
* Base class for aws processors that uses AWSCredentialsProvider interface
for creating aws clients.
@@ -34,7 +41,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
* @see <a
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a>
*/
public abstract class AbstractAWSCredentialsProviderProcessor<ClientType
extends AmazonWebServiceClient>
- extends AbstractAWSProcessor<ClientType> {
+ extends AbstractAWSProcessor<ClientType> implements VerifiableProcessor {
/**
* AWS credentials provider service
@@ -49,31 +56,21 @@ public abstract class
AbstractAWSCredentialsProviderProcessor<ClientType extends
.build();
/**
- * This method checks if {#link {@link #AWS_CREDENTIALS_PROVIDER_SERVICE}
is available and if it
- * is, uses the credentials provider, otherwise it invokes the {@link
AbstractAWSProcessor#onScheduled(ProcessContext)}
- * which uses static AWSCredentials for the aws processors
+ * Attempts to create the client using the controller service first before
falling back to the standard configuration.
+ * @param context The process context
+ * @return The created client
*/
- @OnScheduled
- public void onScheduled(ProcessContext context) {
- ControllerService service =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+ protected ClientType createClient(final ProcessContext context) {
+ final ControllerService service =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
if (service != null) {
getLogger().debug("Using aws credentials provider service for
creating client");
- onScheduledUsingControllerService(context);
+ return createClient(context, getCredentialsProvider(context),
createConfiguration(context));
} else {
getLogger().debug("Using aws credentials for creating client");
- super.onScheduled(context);
+ return super.createClient(context);
}
}
- /**
- * Create aws client using credentials provider
- * @param context the process context
- */
- protected void onScheduledUsingControllerService(ProcessContext context) {
- this.client = createClient(context, getCredentialsProvider(context),
createConfiguration(context));
- super.initializeRegionAndEndpoint(context, this.client);
- }
-
@OnShutdown
public void onShutDown() {
if ( this.client != null ) {
@@ -81,6 +78,29 @@ public abstract class
AbstractAWSCredentialsProviderProcessor<ClientType extends
}
}
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+
+ try {
+ getConfiguration(context);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SUCCESSFUL)
+ .verificationStepName("Create Client and Configure Region")
+ .explanation("Successfully created AWS Client and
configured Region")
+ .build());
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to create AWS Client", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.FAILED)
+ .verificationStepName("Create Client and Configure Region")
+ .explanation("Failed to crete AWS Client or configure
Region: " + e.getMessage())
+ .build());
+ }
+
+ return results;
+ }
+
/**
* Get credentials provider using the {@link AWSCredentialsProviderService}
* @param context the process context
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 6609beb..9a9e0f2 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -276,8 +276,7 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
@OnScheduled
public void onScheduled(final ProcessContext context) {
- this.client = createClient(context, getCredentials(context),
createConfiguration(context));
- initializeRegionAndEndpoint(context, this.client);
+ setClientAndRegion(context);
}
/*
@@ -302,10 +301,6 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
*/
public abstract void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException;
- protected void initializeRegionAndEndpoint(final ProcessContext context,
final AmazonWebServiceClient client) {
- this.region = getRegionAndInitializeEndpoint(context, client);
- }
-
protected Region getRegionAndInitializeEndpoint(final ProcessContext
context, final AmazonWebServiceClient client) {
final Region region;
// if the processor supports REGION, get the configured region.
@@ -336,7 +331,7 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
// falling back to the configured region if the parse fails
// e.g. in case of
https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
String regionValue = parseRegionForVPCE(urlstr,
region.getName());
- client.setEndpoint(urlstr, this.client.getServiceName(),
regionValue);
+ client.setEndpoint(urlstr, client.getServiceName(),
regionValue);
} else {
// handling non-vpce custom endpoints where the AWS
library can parse the region out
// e.g. https://sqs.{region}.***.***.***.gov
@@ -416,4 +411,49 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
getClient().shutdown();
}
}
+
+ protected void setClientAndRegion(final ProcessContext context) {
+ final AWSConfiguration awsConfiguration = getConfiguration(context);
+ this.client = awsConfiguration.getClient();
+ this.region = awsConfiguration.getRegion();
+ }
+
+ /**
+ * Creates an AWS service client from the context.
+ * @param context The process context
+ * @return The created client
+ */
+ protected ClientType createClient(final ProcessContext context) {
+ return createClient(context, getCredentials(context),
createConfiguration(context));
+ }
+
+ /**
+ * Parses and configures the client and region from the context.
+ * @param context The process context
+ * @return The parsed configuration
+ */
+ protected AWSConfiguration getConfiguration(final ProcessContext context) {
+ final ClientType client = createClient(context);
+ final Region region = getRegionAndInitializeEndpoint(context, client);
+
+ return new AWSConfiguration(client, region);
+ }
+
+ public class AWSConfiguration {
+ final ClientType client;
+ final Region region;
+
+ public AWSConfiguration(final ClientType client, final Region region) {
+ this.client = client;
+ this.region = region;
+ }
+
+ public ClientType getClient() {
+ return client;
+ }
+
+ public Region getRegion() {
+ return region;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
index 6c5d4f6..40388b0 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
@@ -199,15 +199,15 @@ public abstract class AbstractDynamoDBProcessor extends
AbstractAWSCredentialsPr
return client;
}
- protected Object getValue(ProcessContext context, PropertyDescriptor type,
PropertyDescriptor value, FlowFile flowFile) {
+ protected Object getValue(final ProcessContext context, final
PropertyDescriptor type, final PropertyDescriptor value, final Map<String,
String> attributes) {
if (
context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue()))
{
- return
context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue();
+ return
context.getProperty(value).evaluateAttributeExpressions(attributes).getValue();
} else {
- return new
BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue());
+ return new
BigDecimal(context.getProperty(value).evaluateAttributeExpressions(attributes).getValue());
}
}
- protected Object getAttributeValue(ProcessContext context,
PropertyDescriptor propertyType, AttributeValue value) {
+ protected Object getAttributeValue(final ProcessContext context, final
PropertyDescriptor propertyType, final AttributeValue value) {
if (
context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue()))
{
if ( value == null ) return null;
else return value.getS();
@@ -217,9 +217,14 @@ public abstract class AbstractDynamoDBProcessor extends
AbstractAWSCredentialsPr
}
}
+ protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) {
+ return new DynamoDB(client);
+ }
+
protected synchronized DynamoDB getDynamoDB() {
- if ( dynamoDB == null )
- dynamoDB = new DynamoDB(client);
+ if (dynamoDB == null) {
+ dynamoDB = getDynamoDB(client);
+ }
return dynamoDB;
}
@@ -297,28 +302,33 @@ public abstract class AbstractDynamoDBProcessor extends
AbstractAWSCredentialsPr
keysToFlowFileMap.remove(itemKeys);
}
- protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object
rangeKeyValue, ProcessSession session,
- FlowFile flowFile) {
+ protected boolean isRangeKeyValueConsistent(final String rangeKeyName,
final Object rangeKeyValue, final ProcessSession session, FlowFile flowFile) {
+ try {
+ validateRangeKeyValue(rangeKeyName, rangeKeyValue);
+ } catch (final IllegalArgumentException e) {
+ getLogger().error(e.getMessage() + ": " + flowFile, e);
+ flowFile = session.putAttribute(flowFile,
DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
+ + "'/value '" + rangeKeyValue + "' inconsistency error");
+ session.transfer(flowFile, REL_FAILURE);
+ return false;
+ }
+
+ return true;
+ }
+
+ protected void validateRangeKeyValue(final String rangeKeyName, final
Object rangeKeyValue) {
boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
boolean isRangeValueNull = rangeKeyValue == null;
boolean isConsistent = true;
- if ( ! isRangeNameBlank && (isRangeValueNull ||
StringUtils.isBlank(rangeKeyValue.toString()))) {
+ if (!isRangeNameBlank && (isRangeValueNull ||
StringUtils.isBlank(rangeKeyValue.toString()))) {
isConsistent = false;
}
- if ( isRangeNameBlank && ( ! isRangeValueNull && !
StringUtils.isBlank(rangeKeyValue.toString()))) {
+ if (isRangeNameBlank && (!isRangeValueNull &&
!StringUtils.isBlank(rangeKeyValue.toString()))) {
isConsistent = false;
}
-
- if ( ! isConsistent ) {
- getLogger().error("Range key name '" + rangeKeyName + "' was not
consistent with range value "
- + rangeKeyValue + "'" + flowFile);
- flowFile = session.putAttribute(flowFile,
DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
- + "'/value '" + rangeKeyValue + "' inconsistency error");
- session.transfer(flowFile, REL_FAILURE);
+ if (!isConsistent) {
+ throw new IllegalArgumentException(String.format("Range key name
'%s' was not consistent with range value '%s'", rangeKeyName, rangeKeyValue));
}
-
- return isConsistent;
-
}
protected boolean isHashKeyValueConsistent(String hashKeyName, Object
hashKeyValue, ProcessSession session,
@@ -326,10 +336,11 @@ public abstract class AbstractDynamoDBProcessor extends
AbstractAWSCredentialsPr
boolean isConsistent = true;
- if ( hashKeyValue == null ||
StringUtils.isBlank(hashKeyValue.toString())) {
- getLogger().error("Hash key value '" + hashKeyValue + "' is
required for flow file " + flowFile);
- flowFile = session.putAttribute(flowFile,
DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName
- + "/value '" + hashKeyValue + "' inconsistency error");
+ try {
+ validateHashKeyValue(hashKeyValue);
+ } catch (final IllegalArgumentException e) {
+ getLogger().error(e.getMessage() + ": " + flowFile, e);
+ flowFile = session.putAttribute(flowFile,
DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName + "/value '" +
hashKeyValue + "' inconsistency error");
session.transfer(flowFile, REL_FAILURE);
isConsistent = false;
}
@@ -338,6 +349,12 @@ public abstract class AbstractDynamoDBProcessor extends
AbstractAWSCredentialsPr
}
+ protected void validateHashKeyValue(final Object hashKeyValue) {
+ if (hashKeyValue == null ||
StringUtils.isBlank(hashKeyValue.toString())) {
+ throw new IllegalArgumentException(String.format("Hash key value
is required. Provided value was '%s'", hashKeyValue));
+ }
+ }
+
@OnStopped
public void onStopped() {
this.dynamoDB = null;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
index 0c310db..7cde727 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
@@ -97,7 +97,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
public AbstractAWSGatewayApiProcessor() {
}
- public AbstractAWSGatewayApiProcessor(AmazonHttpClient client) {
+ public AbstractAWSGatewayApiProcessor(final AmazonHttpClient client) {
providedClient = client;
}
@@ -280,8 +280,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
.build();
@Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
- String propertyDescriptorName) {
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
@@ -318,14 +317,13 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
}
@Override
- protected Collection<ValidationResult> customValidate(
- final ValidationContext validationContext) {
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(3);
results.addAll(super.customValidate(validationContext));
final boolean querySet =
validationContext.getProperty(PROP_QUERY_PARAMS).isSet();
if (querySet) {
- String input =
validationContext.getProperty(PROP_QUERY_PARAMS).getValue();
+ final String input =
validationContext.getProperty(PROP_QUERY_PARAMS).getValue();
// if we have expressions, we don't do further validation
if
(!(validationContext.isExpressionLanguageSupported(PROP_QUERY_PARAMS.getName())
&& validationContext.isExpressionLanguagePresent(input))) {
@@ -350,15 +348,15 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
}
}
}
- String method =
trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue())
+ final String method =
trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue())
.toUpperCase();
// if there are expressions do not validate
if
(!(validationContext.isExpressionLanguageSupported(PROP_METHOD.getName())
&& validationContext.isExpressionLanguagePresent(method))) {
try {
- HttpMethodName methodName = HttpMethodName.fromValue(method);
- } catch (IllegalArgumentException e) {
+ HttpMethodName.fromValue(method);
+ } catch (final IllegalArgumentException e) {
results.add(new
ValidationResult.Builder().subject(PROP_METHOD.getName()).input(method)
.explanation("Unsupported METHOD")
.valid(false).build());
@@ -369,9 +367,9 @@ public abstract class AbstractAWSGatewayApiProcessor extends
}
@Override
- protected GenericApiGatewayClient createClient(ProcessContext context,
- AWSCredentialsProvider
awsCredentialsProvider,
- ClientConfiguration
clientConfiguration) {
+ protected GenericApiGatewayClient createClient(final ProcessContext
context,
+ final
AWSCredentialsProvider awsCredentialsProvider,
+ final ClientConfiguration
clientConfiguration) {
GenericApiGatewayClientBuilder builder = new
GenericApiGatewayClientBuilder()
.withCredentials(awsCredentialsProvider).withClientConfiguration(clientConfiguration)
@@ -392,33 +390,34 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
protected GenericApiGatewayClient createClient(final ProcessContext
context,
final AWSCredentials
credentials,
final ClientConfiguration
clientConfiguration) {
- return createClient(context, new
AWSStaticCredentialsProvider(credentials),
- clientConfiguration);
+ return createClient(context, new
AWSStaticCredentialsProvider(credentials), clientConfiguration);
}
protected GenericApiGatewayRequest configureRequest(final ProcessContext
context,
final ProcessSession
session,
final String
resourcePath,
- final FlowFile
requestFlowFile) {
- String method = trimToEmpty(
+ final FlowFile
requestFlowFile,
+ final Map<String,
String> attributes) {
+ final String method = trimToEmpty(
context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile)
.getValue()).toUpperCase();
- HttpMethodName methodName = HttpMethodName.fromValue(method);
- return configureRequest(context, session,
resourcePath,requestFlowFile, methodName);
+ final HttpMethodName methodName = HttpMethodName.fromValue(method);
+ return configureRequest(context, session,
resourcePath,requestFlowFile, methodName, attributes);
}
protected GenericApiGatewayRequest configureRequest(final ProcessContext
context,
final ProcessSession
session,
final String
resourcePath,
final FlowFile
requestFlowFile,
- final HttpMethodName
methodName) {
+ final HttpMethodName
methodName,
+ final Map<String,
String> attributes) {
GenericApiGatewayRequestBuilder builder = new
GenericApiGatewayRequestBuilder()
.withResourcePath(resourcePath);
final Map<String, List<String>> parameters = getParameters(context);
builder = builder.withParameters(parameters);
- InputStream requestBody = null;
+ InputStream requestBody;
switch (methodName) {
case GET:
builder = builder.withHttpMethod(HttpMethodName.GET);
@@ -447,7 +446,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
break;
}
- builder = setHeaderProperties(context, builder, methodName,
requestFlowFile);
+ builder = setHeaderProperties(context, builder, methodName,
attributes);
return builder.build();
}
@@ -456,7 +455,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
final FlowFile requestFlowFile)
{
if (context.getProperty(PROP_SEND_BODY).asBoolean() && requestFlowFile
!= null) {
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
session.exportTo(requestFlowFile, outputStream);
return new ByteArrayInputStream(outputStream.toByteArray());
@@ -467,24 +466,22 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
protected GenericApiGatewayRequestBuilder setHeaderProperties(final
ProcessContext context,
GenericApiGatewayRequestBuilder requestBuilder,
-
HttpMethodName methodName,
- final
FlowFile requestFlowFile) {
+ final
HttpMethodName methodName,
+ final
Map<String, String> requestAttributes) {
- Map<String, String> headers = new HashMap<>();
- for (String headerKey : dynamicPropertyNames) {
- String headerValue = context.getProperty(headerKey)
-
.evaluateAttributeExpressions(requestFlowFile).getValue();
+ final Map<String, String> headers = new HashMap<>();
+ for (final String headerKey : dynamicPropertyNames) {
+ final String headerValue =
context.getProperty(headerKey).evaluateAttributeExpressions(requestAttributes).getValue();
headers.put(headerKey, headerValue);
}
// iterate through the flowfile attributes, adding any attribute that
// matches the attributes-to-send pattern. if the pattern is not set
// (it's an optional property), ignore that attribute entirely
- if (regexAttributesToSend != null && requestFlowFile != null) {
- Map<String, String> attributes = requestFlowFile.getAttributes();
- Matcher m = regexAttributesToSend.matcher("");
- for (Map.Entry<String, String> entry : attributes.entrySet()) {
- String headerKey = trimToEmpty(entry.getKey());
+ if (regexAttributesToSend != null) {
+ final Matcher m = regexAttributesToSend.matcher("");
+ for (final Map.Entry<String, String> entry :
requestAttributes.entrySet()) {
+ final String headerKey = trimToEmpty(entry.getKey());
// don't include any of the ignored attributes
if (IGNORED_ATTRIBUTES.contains(headerKey)) {
@@ -502,8 +499,8 @@ public abstract class AbstractAWSGatewayApiProcessor extends
}
String contentType = context.getProperty(PROP_CONTENT_TYPE)
-
.evaluateAttributeExpressions(requestFlowFile).getValue();
- boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
+
.evaluateAttributeExpressions(requestAttributes).getValue();
+ final boolean sendBody =
context.getProperty(PROP_SEND_BODY).asBoolean();
contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE
: contentType;
if (methodName == HttpMethodName.PUT || methodName ==
HttpMethodName.POST
|| methodName == HttpMethodName.PATCH) {
@@ -527,23 +524,23 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
* @param context ProcessContext
* @return Map of names and values
*/
- protected Map<String, List<String>> getParameters(ProcessContext context) {
+ protected Map<String, List<String>> getParameters(final ProcessContext
context) {
if (!context.getProperty(PROP_QUERY_PARAMS).isSet()) {
return new HashMap<>();
}
final String queryString = context.getProperty(PROP_QUERY_PARAMS)
.evaluateAttributeExpressions().getValue();
- List<NameValuePair> params = URLEncodedUtils
+ final List<NameValuePair> params = URLEncodedUtils
.parse(queryString, Charsets.toCharset("UTF-8"));
if (params.isEmpty()) {
return new HashMap<>();
}
- Map<String, List<String>> map = new HashMap<>();
+ final Map<String, List<String>> map = new HashMap<>();
- for (NameValuePair nvp : params) {
+ for (final NameValuePair nvp : params) {
if (!map.containsKey(nvp.getName())) {
map.put(nvp.getName(), new ArrayList<>());
}
@@ -555,19 +552,17 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
/**
* Returns a Map of flowfile attributes from the response http headers.
Multivalue headers are naively converted to comma separated strings.
*/
- protected Map<String, String> convertAttributesFromHeaders(
- GenericApiGatewayResponse responseHttp) {
+ protected Map<String, String> convertAttributesFromHeaders(final
GenericApiGatewayResponse responseHttp) {
// create a new hashmap to store the values from the connection
- Map<String, String> map = new HashMap<>();
+ final Map<String, String> map = new HashMap<>();
responseHttp.getHttpResponse().getHeaders().entrySet().forEach((entry)
-> {
- String key = entry.getKey();
- String value = entry.getValue();
+ final String key = entry.getKey();
+ final String value = entry.getValue();
if (key == null) {
return;
}
-
// we ignore any headers with no actual values (rare)
if (StringUtils.isBlank(value)) {
return;
@@ -590,8 +585,8 @@ public abstract class AbstractAWSGatewayApiProcessor extends
throw new IllegalStateException("Unknown relationship " + name);
}
- protected void route(FlowFile request, FlowFile response, ProcessSession
session,
- ProcessContext context, int statusCode,
Set<Relationship> relationships) {
+ protected void route(FlowFile request, final FlowFile response, final
ProcessSession session,
+ final ProcessContext context, final int statusCode,
final Set<Relationship> relationships) {
// check if we should yield the processor
if (!isSuccess(statusCode) && request == null) {
context.yield();
@@ -609,12 +604,10 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
if (isSuccess(statusCode)) {
// we have two flowfiles to transfer
if (request != null) {
- session
- .transfer(request,
getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships));
+ session.transfer(request,
getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships));
}
if (response != null && !responseSent) {
- session
- .transfer(response,
getRelationshipForName(REL_RESPONSE_NAME, relationships));
+ session.transfer(response,
getRelationshipForName(REL_RESPONSE_NAME, relationships));
}
// 5xx -> RETRY
@@ -636,20 +629,20 @@ public abstract class AbstractAWSGatewayApiProcessor
extends
}
- protected boolean isSuccess(int statusCode) {
+ protected boolean isSuccess(final int statusCode) {
return statusCode / 100 == 2;
}
- protected void logRequest(ComponentLog logger, URI endpoint,
GenericApiGatewayRequest request) {
+ protected void logRequest(final ComponentLog logger, final URI endpoint,
final GenericApiGatewayRequest request) {
try {
logger.debug("\nRequest to remote service:\n\t{}\t{}\t\n{}",
new Object[]{endpoint.toURL().toExternalForm(),
request.getHttpMethod(), getLogString(request.getHeaders())});
- } catch (MalformedURLException e) {
+ } catch (final MalformedURLException e) {
logger.debug(e.getMessage());
}
}
- protected void logResponse(ComponentLog logger, GenericApiGatewayResponse
response) {
+ protected void logResponse(final ComponentLog logger, final
GenericApiGatewayResponse response) {
try {
logger.debug("\nResponse from remote service:\n\t{}\n{}",
new
Object[]{response.getHttpResponse().getHttpRequest().getURI().toURL().toExternalForm(),
getLogString(response.getHttpResponse().getHeaders())});
@@ -658,9 +651,9 @@ public abstract class AbstractAWSGatewayApiProcessor extends
}
}
- protected String getLogString(Map<String, String> map) {
- StringBuilder sb = new StringBuilder();
- if(map != null && map.size() > 0) {
+ protected String getLogString(final Map<String, String> map) {
+ final StringBuilder sb = new StringBuilder();
+ if (map != null && map.size() > 0) {
for (Map.Entry<String, String> entry : map.entrySet()) {
String value = entry.getValue();
sb.append("\t");
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
index 3c9f73b..34f712d 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
@@ -101,8 +101,8 @@ public class DeleteDynamoDB extends
AbstractWriteDynamoDBProcessor {
TableWriteItems tableWriteItems = new TableWriteItems(table);
for (FlowFile flowFile : flowFiles) {
- final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile);
- final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+ final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile.getAttributes());
+ final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue,
session, flowFile)) {
continue;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
index 328dae6..14626ca 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
@@ -16,40 +16,44 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
-import java.io.ByteArrayInputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
-import com.amazonaws.services.dynamodbv2.model.AttributeValue;
-import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
@SupportsBatching
@SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
@@ -100,42 +104,97 @@ public class GetDynamoDB extends
AbstractDynamoDBProcessor {
}
@Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) {
- List<FlowFile> flowFiles =
session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
- if (flowFiles == null || flowFiles.size() == 0) {
- return;
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+ final String table =
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+ final String jsonDocument =
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
+
+ TableKeysAndAttributes tableKeysAndAttributes;
+
+ try {
+ tableKeysAndAttributes = getTableKeysAndAttributes(context,
attributes);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SUCCESSFUL)
+ .verificationStepName("Configure DynamoDB BatchGetItems
Request")
+ .explanation(String.format("Successfully configured
BatchGetItems Request"))
+ .build());
+ } catch (final IllegalArgumentException e) {
+ verificationLogger.error("Failed to configured BatchGetItems
Request", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.FAILED)
+ .verificationStepName("Configure DynamoDB BatchGetItems
Request")
+ .explanation(String.format("Failed to configured
BatchGetItems Request: " + e.getMessage()))
+ .build());
+ return results;
}
- Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+ if (tableKeysAndAttributes.getPrimaryKeys() == null ||
tableKeysAndAttributes.getPrimaryKeys().isEmpty()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SKIPPED)
+ .verificationStepName("Get DynamoDB Items")
+ .explanation(String.format("Skipped getting DynamoDB items
because no primary keys would be included in retrieval"))
+ .build());
+ } else {
+ try {
+ final DynamoDB dynamoDB =
getDynamoDB(getConfiguration(context).getClient());
+ int totalCount = 0;
+ int jsonDocumentCount = 0;
- final String table =
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
- TableKeysAndAttributes tableKeysAndAttributes = new
TableKeysAndAttributes(table);
+ BatchGetItemOutcome result =
dynamoDB.batchGetItem(tableKeysAndAttributes);
- final String hashKeyName =
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
- final String rangeKeyName =
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
- final String jsonDocument =
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
+ // Handle processed items and get the json document
+ final List<Item> items = result.getTableItems().get(table);
+ for (final Item item : items) {
+ totalCount++;
+ if (item.get(jsonDocument) != null) {
+ jsonDocumentCount++;
+ }
+ }
- for (FlowFile flowFile : flowFiles) {
- final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile);
- final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SUCCESSFUL)
+ .verificationStepName("Get DynamoDB Items")
+ .explanation(String.format("Successfully retrieved %s
items, including %s JSON documents, from DynamoDB", totalCount,
jsonDocumentCount))
+ .build());
- if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue,
session, flowFile)) {
- continue;
- }
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to retrieve items from
DynamoDB", e);
- if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue,
session, flowFile) ) {
- continue;
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.FAILED)
+ .verificationStepName("Get DynamoDB Items")
+ .explanation(String.format("Failed to retrieve items
from DynamoDB: %s", e.getMessage()))
+ .build());
}
+ }
- keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue),
flowFile);
+ return results;
+ }
- if ( rangeKeyValue == null ||
StringUtils.isBlank(rangeKeyValue.toString()) ) {
- tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName,
hashKeyValue);
- } else {
- tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName,
hashKeyValue, rangeKeyName, rangeKeyValue);
- }
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+ final List<FlowFile> flowFiles =
session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
+ if (flowFiles == null || flowFiles.size() == 0) {
+ return;
+ }
+
+ final Map<ItemKeys,FlowFile> keysToFlowFileMap =
getKeysToFlowFileMap(context, session, flowFiles);
+
+ final TableKeysAndAttributes tableKeysAndAttributes;
+ try {
+ tableKeysAndAttributes = getTableKeysAndAttributes(context,
flowFiles.stream()
+
.map(FlowFile::getAttributes).collect(Collectors.toList()).toArray(new Map[0]));
+ } catch (final IllegalArgumentException e) {
+ getLogger().error(e.getMessage(), e);
+ return;
}
+ final String table =
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+ final String hashKeyName =
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
+ final String rangeKeyName =
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
+ final String jsonDocument =
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
+
if (keysToFlowFileMap.isEmpty()) {
return;
}
@@ -146,12 +205,12 @@ public class GetDynamoDB extends
AbstractDynamoDBProcessor {
BatchGetItemOutcome result =
dynamoDB.batchGetItem(tableKeysAndAttributes);
// Handle processed items and get the json document
- List<Item> items = result.getTableItems().get(table);
- for (Item item : items) {
- ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName),
item.get(rangeKeyName));
+ final List<Item> items = result.getTableItems().get(table);
+ for (final Item item : items) {
+ final ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName),
item.get(rangeKeyName));
FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
- if ( item.get(jsonDocument) != null ) {
+ if (item.get(jsonDocument) != null) {
ByteArrayInputStream bais = new
ByteArrayInputStream(item.getJSON(jsonDocument).getBytes());
flowFile = session.importFrom(bais, flowFile);
}
@@ -161,38 +220,85 @@ public class GetDynamoDB extends
AbstractDynamoDBProcessor {
}
// Handle unprocessed keys
- Map<String, KeysAndAttributes> unprocessedKeys =
result.getUnprocessedKeys();
+ final Map<String, KeysAndAttributes> unprocessedKeys =
result.getUnprocessedKeys();
if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
- KeysAndAttributes keysAndAttributes =
unprocessedKeys.get(table);
- List<Map<String, AttributeValue>> keys =
keysAndAttributes.getKeys();
+ final KeysAndAttributes keysAndAttributes =
unprocessedKeys.get(table);
+ final List<Map<String, AttributeValue>> keys =
keysAndAttributes.getKeys();
- for (Map<String,AttributeValue> unprocessedKey : keys) {
- Object hashKeyValue = getAttributeValue(context,
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
- Object rangeKeyValue = getAttributeValue(context,
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
+ for (final Map<String,AttributeValue> unprocessedKey : keys) {
+ final Object hashKeyValue = getAttributeValue(context,
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
+ final Object rangeKeyValue = getAttributeValue(context,
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
sendUnprocessedToUnprocessedRelationship(session,
keysToFlowFileMap, hashKeyValue, rangeKeyValue);
}
}
// Handle any remaining items
- for (ItemKeys key : keysToFlowFileMap.keySet()) {
+ for (final ItemKeys key : keysToFlowFileMap.keySet()) {
FlowFile flowFile = keysToFlowFileMap.get(key);
flowFile = session.putAttribute(flowFile,
DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE +
key.toString() );
session.transfer(flowFile,REL_NOT_FOUND);
keysToFlowFileMap.remove(key);
}
- } catch(AmazonServiceException exception) {
+ } catch(final AmazonServiceException exception) {
getLogger().error("Could not process flowFiles due to service
exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processServiceException(session,
flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
- } catch(AmazonClientException exception) {
+ } catch(final AmazonClientException exception) {
getLogger().error("Could not process flowFiles due to client
exception : " + exception.getMessage());
List<FlowFile> failedFlowFiles = processClientException(session,
flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
- } catch(Exception exception) {
+ } catch(final Exception exception) {
getLogger().error("Could not process flowFiles due to exception :
" + exception.getMessage());
List<FlowFile> failedFlowFiles = processException(session,
flowFiles, exception);
session.transfer(failedFlowFiles, REL_FAILURE);
}
}
+
+ private Map<ItemKeys, FlowFile> getKeysToFlowFileMap(final ProcessContext
context, final ProcessSession session, final List<FlowFile> flowFiles) {
+ final Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+
+ final String hashKeyName =
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
+ final String rangeKeyName =
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
+
+ for (final FlowFile flowFile : flowFiles) {
+ final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile.getAttributes());
+ final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
+
+ if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session,
flowFile)) {
+ continue;
+ }
+
+ if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue,
session, flowFile)) {
+ continue;
+ }
+
+ keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue),
flowFile);
+ }
+ return keysToFlowFileMap;
+ }
+
+ private TableKeysAndAttributes getTableKeysAndAttributes(final
ProcessContext context, final Map<String, String>... attributes) {
+ final String table =
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+ final TableKeysAndAttributes tableKeysAndAttributes = new
TableKeysAndAttributes(table);
+
+ final String hashKeyName =
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
+ final String rangeKeyName =
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
+
+ for (final Map<String, String> attributeMap : attributes) {
+ final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, attributeMap);
+ final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, attributeMap);
+
+ validateHashKeyValue(hashKeyValue);
+ validateRangeKeyValue(rangeKeyName, rangeKeyValue);
+
+ if (rangeKeyValue == null ||
StringUtils.isBlank(rangeKeyValue.toString())) {
+ tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName,
hashKeyValue);
+ } else {
+ tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName,
hashKeyValue, rangeKeyName, rangeKeyValue);
+ }
+ }
+ return tableKeysAndAttributes;
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
index 92e552a..23b8bab 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -118,8 +118,8 @@ public class PutDynamoDB extends
AbstractWriteDynamoDBProcessor {
TableWriteItems tableWriteItems = new TableWriteItems(table);
for (FlowFile flowFile : flowFiles) {
- final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile);
- final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+ final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE,
HASH_KEY_VALUE, flowFile.getAttributes());
+ final Object rangeKeyValue = getValue(context,
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session,
flowFile)) {
continue;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 719f1cf..b9de5fe 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -16,16 +16,12 @@
*/
package org.apache.nifi.processors.aws.s3;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -37,23 +33,30 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.util.StandardValidators;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
@SupportsBatching
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@@ -149,6 +152,36 @@ public class FetchS3Object extends AbstractS3Processor {
}
@Override
+ public List<ConfigVerificationResult> verify(ProcessContext context,
ComponentLog verificationLogger, Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+ final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key =
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+ final AmazonS3 client = getConfiguration(context).getClient();
+ final GetObjectMetadataRequest request =
createGetObjectMetadataRequest(context, attributes);
+
+ try {
+ final ObjectMetadata objectMetadata =
client.getObjectMetadata(request);
+ final long byteCount = objectMetadata.getContentLength();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("HEAD S3 Object")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully performed HEAD on
[%s] (%s bytes) from Bucket [%s]", key, byteCount, bucket))
+ .build());
+ } catch (final Exception e) {
+ getLogger().error(String.format("Failed to fetch [%s] from Bucket
[%s]", key, bucket), e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("HEAD S3 Object")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to perform HEAD on [%s]
from Bucket [%s]: %s", key, bucket, e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
@@ -156,48 +189,18 @@ public class FetchS3Object extends AbstractS3Processor {
}
final long startNanos = System.nanoTime();
- final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
- final String key =
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
- final String versionId =
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
- final boolean requesterPays =
context.getProperty(REQUESTER_PAYS).asBoolean();
- final long rangeStart = (context.getProperty(RANGE_START).isSet() ?
context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
: 0L);
- final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ?
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
: null);
-
- final AmazonS3 client = getClient();
- final GetObjectRequest request;
- if (versionId == null) {
- request = new GetObjectRequest(bucket, key);
- } else {
- request = new GetObjectRequest(bucket, key, versionId);
- }
- request.setRequesterPays(requesterPays);
-
- // tl;dr don't setRange(0) on GetObjectRequest because it results in
- // InvalidRange errors on zero byte objects.
- //
- // Amazon S3 sets byte ranges using HTTP Range headers as described in
- // https://datatracker.ietf.org/doc/html/rfc2616#section-14.35 and
- // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1. There
- // isn't a satisfiable byte range specification for zero length objects
- // so 416 (Request range not satisfiable) is returned.
- //
- // Since the effect of the byte range 0- is equivalent to not sending a
- // byte range and works for both zero and non-zero length objects,
- // the single argument setRange() only needs to be called when the
- // first byte position is greater than zero.
- if (rangeLength != null) {
- request.setRange(rangeStart, rangeStart + rangeLength - 1);
- } else if (rangeStart > 0) {
- request.setRange(rangeStart);
- }
final Map<String, String> attributes = new HashMap<>();
- AmazonS3EncryptionService encryptionService =
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+ final AmazonS3EncryptionService encryptionService =
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
if (encryptionService != null) {
- encryptionService.configureGetObjectRequest(request, new
ObjectMetadata());
attributes.put("s3.encryptionStrategy",
encryptionService.getStrategyName());
}
+ final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+ final String key =
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+ final AmazonS3 client = getClient();
+ final GetObjectRequest request = createGetObjectRequest(context,
flowFile.getAttributes());
try (final S3Object s3Object = client.getObject(request)) {
if (s3Object == null) {
@@ -272,6 +275,64 @@ public class FetchS3Object extends AbstractS3Processor {
session.getProvenanceReporter().fetch(flowFile, "http://" + bucket +
".amazonaws.com/" + key, transferMillis);
}
+ private GetObjectMetadataRequest createGetObjectMetadataRequest(final
ProcessContext context, final Map<String, String> attributes) {
+ final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key =
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+ final String versionId =
context.getProperty(VERSION_ID).evaluateAttributeExpressions(attributes).getValue();
+ final boolean requesterPays =
context.getProperty(REQUESTER_PAYS).asBoolean();
+
+ final GetObjectMetadataRequest request;
+ if (versionId == null) {
+ request = new GetObjectMetadataRequest(bucket, key);
+ } else {
+ request = new GetObjectMetadataRequest(bucket, key, versionId);
+ }
+ request.setRequesterPays(requesterPays);
+ return request;
+ }
+
+ private GetObjectRequest createGetObjectRequest(final ProcessContext
context, final Map<String, String> attributes) {
+ final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key =
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+ final String versionId =
context.getProperty(VERSION_ID).evaluateAttributeExpressions(attributes).getValue();
+ final boolean requesterPays =
context.getProperty(REQUESTER_PAYS).asBoolean();
+ final long rangeStart = (context.getProperty(RANGE_START).isSet() ?
context.getProperty(RANGE_START).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue()
: 0L);
+ final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ?
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue()
: null);
+
+ final GetObjectRequest request;
+ if (versionId == null) {
+ request = new GetObjectRequest(bucket, key);
+ } else {
+ request = new GetObjectRequest(bucket, key, versionId);
+ }
+ request.setRequesterPays(requesterPays);
+
+ // tl;dr don't setRange(0) on GetObjectRequest because it results in
+ // InvalidRange errors on zero byte objects.
+ //
+ // Amazon S3 sets byte ranges using HTTP Range headers as described in
+ // https://datatracker.ietf.org/doc/html/rfc2616#section-14.35 and
+ // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1. There
+ // isn't a satisfiable byte range specification for zero length objects
+ // so 416 (Request range not satisfiable) is returned.
+ //
+ // Since the effect of the byte range 0- is equivalent to not sending a
+ // byte range and works for both zero and non-zero length objects,
+ // the single argument setRange() only needs to be called when the
+ // first byte position is greater than zero.
+ if (rangeLength != null) {
+ request.setRange(rangeStart, rangeStart + rangeLength - 1);
+ } else if (rangeStart > 0) {
+ request.setRange(rangeStart);
+ }
+
+ final AmazonS3EncryptionService encryptionService =
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+ if (encryptionService != null) {
+ encryptionService.configureGetObjectRequest(request, new
ObjectMetadata());
+ }
+ return request;
+ }
+
protected void setFilePathAttributes(Map<String, String> attributes,
String filePathName) {
final int lastSlash = filePathName.lastIndexOf("/");
if (lastSlash > -1 && lastSlash < filePathName.length() - 1) {
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 5446650..5ee1a83 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -53,7 +53,6 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -901,14 +900,10 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog logger, final Map<String, String> attributes) {
- final ControllerService service =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
- final AmazonS3Client client = service != null ? createClient(context,
getCredentialsProvider(context), createConfiguration(context))
- : createClient(context, getCredentials(context),
createConfiguration(context));
+ final AmazonS3Client client = getConfiguration(context).getClient();
- getRegionAndInitializeEndpoint(context, client);
-
- final List<ConfigVerificationResult> results = new ArrayList<>();
- final String bucketName =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, logger, attributes));
+ final String bucketName =
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
if (bucketName == null || bucketName.trim().isEmpty()) {
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
index d138dcc..8c6fc77 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -41,6 +43,7 @@ import
org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -70,6 +73,8 @@ import java.util.concurrent.TimeUnit;
+ "of the Dynamic Property.")
public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
+ private static final Set<String> IDEMPOTENT_METHODS = new
HashSet<>(Arrays.asList("GET", "HEAD", "OPTIONS"));
+
public static final List<PropertyDescriptor> properties =
Collections.unmodifiableList(Arrays
.asList(
PROP_METHOD,
@@ -151,8 +156,8 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
}
@Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- ComponentLog logger = getLogger();
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final ComponentLog logger = getLogger();
FlowFile requestFlowFile = session.get();
// Checking to see if the property to put the body of the response in
an attribute was set
@@ -179,41 +184,20 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
final GenericApiGatewayClient client = getClient();
- final GenericApiGatewayRequest request = configureRequest(context,
session,
-
resourceName,
-
requestFlowFile);
-
- logRequest(logger, client.getEndpoint(), request);
final long startNanos = System.nanoTime();
- GenericApiGatewayResponse response = null;
- GenericApiGatewayException exception = null;
- try {
- response = client.execute(request);
- logResponse(logger, response);
- } catch (GenericApiGatewayException gag) {
- // ERROR response codes may come back as exceptions, 404 for
example
- exception = gag;
- }
+ final Map<String, String> attributes = requestFlowFile == null ?
Collections.emptyMap() : requestFlowFile.getAttributes();
+ final GatewayResponse gatewayResponse = invokeGateway(client,
context, session, requestFlowFile, attributes, logger);
- final int statusCode;
- if (exception != null) {
- statusCode = exception.getStatusCode();
- } else {
- statusCode = response.getHttpResponse().getStatusCode();
- }
+ final GenericApiGatewayResponse response =
gatewayResponse.response;
+ final GenericApiGatewayException exception =
gatewayResponse.exception;
+ final int statusCode = gatewayResponse.statusCode;
- if (statusCode == 0) {
- throw new IllegalStateException(
- "Status code unknown, connection hasn't been attempted.");
- }
final String endpoint =
context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
- boolean outputRegardless =
context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS)
+ final boolean outputRegardless =
context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS)
.asBoolean();
- boolean outputBodyToResponseContent = (isSuccess(statusCode) &&
!putToAttribute
- || outputRegardless);
- boolean outputBodyToRequestAttribute =
- (!isSuccess(statusCode) || putToAttribute) && requestFlowFile
!= null;
+ boolean outputBodyToResponseContent = (isSuccess(statusCode) &&
!putToAttribute || outputRegardless);
+ boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) ||
putToAttribute) && requestFlowFile != null;
boolean bodyExists = response != null && response.getBody() !=
null;
final String statusExplanation;
@@ -246,9 +230,7 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
if
(context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean()) {
// write the response headers as attributes
// this will overwrite any existing flowfile attributes
- requestFlowFile =
session.putAllAttributes(requestFlowFile,
-
convertAttributesFromHeaders(
-
response));
+ requestFlowFile =
session.putAllAttributes(requestFlowFile,
convertAttributesFromHeaders(response));
}
} else {
responseFlowFile = session.create();
@@ -282,8 +264,7 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
responseFlowFile);
// emit provenance event
- final long millis = TimeUnit.NANOSECONDS
- .toMillis(System.nanoTime() - startNanos);
+ final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (requestFlowFile != null) {
session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis);
} else {
@@ -317,7 +298,7 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
if (attributeKey == null) {
attributeKey = RESPONSE_BODY;
}
- byte[] outputBuffer;
+ final byte[] outputBuffer;
int size = 0;
outputBuffer = new byte[maxAttributeSize];
if (bodyExists) {
@@ -340,17 +321,13 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
requestFlowFile = session.putAllAttributes(requestFlowFile,
statusAttributes);
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-
session.getProvenanceReporter().modifyAttributes(requestFlowFile,
- "The " +
attributeKey
- + " has
been added. The value of which is the body of a http call to "
- +
endpoint + resourceName
- + ". It
took " + millis
- +
"millis,");
+
session.getProvenanceReporter().modifyAttributes(requestFlowFile, String
+ .format("The %s has been added. The value of which is
the body of a http call to %s%s. It took %s millis,", attributeKey, endpoint,
resourceName, millis));
}
route(requestFlowFile, responseFlowFile, session, context,
statusCode,
getRelationships());
- } catch (Exception e) {
+ } catch (final Exception e) {
// penalize or yield
if (requestFlowFile != null) {
logger.error("Routing to {} due to exception: {}",
@@ -364,8 +341,7 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
session.transfer(requestFlowFile,
getRelationshipForName(REL_FAILURE_NAME,
getRelationships()));
} else {
- logger.error(
- "Yielding processor due to exception encountered as a
source processor: {}", e);
+ logger.error("Yielding processor due to exception encountered
as a source processor: {}", e);
context.yield();
}
@@ -380,4 +356,92 @@ public class InvokeAWSGatewayApi extends
AbstractAWSGatewayApiProcessor {
}
}
}
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+ final String method = context.getProperty(PROP_METHOD).getValue();
+
+ if (!IDEMPOTENT_METHODS.contains(method)) {
+ return results;
+ }
+
+ final String endpoint =
context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
+ final String resource =
context.getProperty(PROP_RESOURCE_NAME).getValue();
+ try {
+ final GenericApiGatewayClient client =
getConfiguration(context).getClient();
+
+ final GatewayResponse gatewayResponse = invokeGateway(client,
context, null, null, attributes, verificationLogger);
+
+ final String explanation;
+ if (gatewayResponse.exception != null) {
+ final String statusExplanation =
EnglishReasonPhraseCatalog.INSTANCE.getReason(gatewayResponse.statusCode, null);
+ explanation = String.format("Successfully invoked AWS Gateway
API [%s %s/%s] with blank request body, receiving error response [%s] with
status code [%s]",
+ method, endpoint, resource, statusExplanation,
gatewayResponse.statusCode);
+ } else {
+ final String statusExplanation =
gatewayResponse.response.getHttpResponse().getStatusText();
+ explanation = String.format("Successfully invoked AWS Gateway
API [%s %s%/s] with blank request body, receiving success response [%s] with
status code [%s]",
+ method, endpoint, resource, statusExplanation,
gatewayResponse.statusCode);
+ }
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.SUCCESSFUL)
+ .verificationStepName("Invoke AWS Gateway API")
+ .explanation(explanation)
+ .build());
+
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to invoke AWS Gateway API " +
endpoint, e);
+ results.add(new ConfigVerificationResult.Builder()
+ .outcome(Outcome.FAILED)
+ .verificationStepName("Invoke AWS Gateway API")
+ .explanation(String.format("Failed to invoke AWS Gateway
API [%s %s/%s]: %s", method, endpoint, resource, e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
+
+ private GatewayResponse invokeGateway(final GenericApiGatewayClient
client, final ProcessContext context, final ProcessSession session,
+ final FlowFile requestFlowFile,
final Map<String, String> attributes, final ComponentLog logger) {
+ final String resourceName =
context.getProperty(PROP_RESOURCE_NAME).getValue();
+
+ final GenericApiGatewayRequest request = configureRequest(context,
session, resourceName, requestFlowFile, attributes);
+
+ logRequest(logger, client.getEndpoint(), request);
+ GenericApiGatewayResponse response = null;
+ GenericApiGatewayException exception = null;
+ try {
+ response = client.execute(request);
+ logResponse(logger, response);
+ } catch (final GenericApiGatewayException gag) {
+ // ERROR response codes may come back as exceptions, 404 for
example
+ exception = gag;
+ }
+
+ final int statusCode;
+ if (exception != null) {
+ statusCode = exception.getStatusCode();
+ } else {
+ statusCode = response.getHttpResponse().getStatusCode();
+ }
+
+ if (statusCode == 0) {
+ throw new IllegalStateException(
+ "Status code unknown, connection hasn't been attempted.");
+ }
+ return new GatewayResponse(response, exception, statusCode);
+ }
+
+ private class GatewayResponse {
+ private final GenericApiGatewayResponse response;
+ private final GenericApiGatewayException exception;
+ private final int statusCode;
+
+ private GatewayResponse(final GenericApiGatewayResponse response,
final GenericApiGatewayException exception, final int statusCode) {
+ this.response = response;
+ this.exception = exception;
+ this.statusCode = statusCode;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
index 42d56a8..c380248 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
@@ -16,35 +16,42 @@
*/
package org.apache.nifi.processors.aws.dynamodb;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
-import static
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
-
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult;
import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
+import static
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class GetDynamoDBTest extends AbstractDynamoDBTest {
protected GetDynamoDB getDynamoDB;
@@ -79,13 +86,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
};
- getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDB;
- }
- };
-
+ getDynamoDB = mockDynamoDB(mockDynamoDB);
}
@Test
@@ -105,6 +106,9 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ // No actual items returned
+ assertVerificationResults(getRunner, 0, 0);
+
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED,
1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
@@ -144,12 +148,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
};
- getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDB;
- }
- };
+ getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
@@ -165,6 +164,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ assertVerificationResults(getRunner, 1, 0);
+
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS,
1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
@@ -205,12 +206,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
};
- getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDB;
- }
- };
+ getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
@@ -225,6 +221,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.enqueue(new byte[] {});
getRunner.run(1);
+ assertVerificationResults(getRunner, 1, 1);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS,
1);
}
@@ -239,12 +236,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
};
- final GetDynamoDB getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDB;
- }
- };
+ final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
@@ -261,6 +253,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ assertVerificationFailure(getRunner);
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
@@ -281,12 +274,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
};
- final GetDynamoDB getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDB;
- }
- };
+ final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
@@ -303,6 +291,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ assertVerificationFailure(getRunner);
+
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
@@ -322,12 +312,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
};
- final GetDynamoDB getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDB;
- }
- };
+ final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
@@ -344,6 +329,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ assertVerificationFailure(getRunner);
+
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
for (MockFlowFile flowFile : flowFiles) {
@@ -370,12 +357,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
}
};
- final GetDynamoDB getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return notFoundMockDynamoDB;
- }
- };
+ final GetDynamoDB getDynamoDB = mockDynamoDB(notFoundMockDynamoDB);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
@@ -391,6 +373,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ assertVerificationResults(getRunner, 0, 0);
+
getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
@@ -407,12 +391,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
// When writing, mock thrown service exception from AWS
Mockito.when(mockDynamoDb.batchGetItem(ArgumentMatchers.<TableKeysAndAttributes>any())).thenThrow(getSampleAwsServiceException());
- getDynamoDB = new GetDynamoDB() {
- @Override
- protected DynamoDB getDynamoDB() {
- return mockDynamoDb;
- }
- };
+ getDynamoDB = mockDynamoDB(mockDynamoDb);
final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
@@ -427,6 +406,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
getRunner.run(1);
+ assertVerificationFailure(getRunner);
+
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE,
1);
List<MockFlowFile> flowFiles =
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
@@ -540,4 +521,41 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED));
}
}
+
+ private GetDynamoDB mockDynamoDB(final DynamoDB mockDynamoDB) {
+ return new GetDynamoDB() {
+ @Override
+ protected DynamoDB getDynamoDB() {
+ return mockDynamoDB;
+ }
+ @Override
+ protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) {
+ return mockDynamoDB;
+ }
+
+ @Override
+ protected
AbstractAWSProcessor<AmazonDynamoDBClient>.AWSConfiguration
getConfiguration(final ProcessContext context) {
+ final AmazonDynamoDBClient client =
Mockito.mock(AmazonDynamoDBClient.class);
+ return new AWSConfiguration(client, region);
+ }
+ };
+ }
+
+ private void assertVerificationFailure(final TestRunner runner) {
+ final List<ConfigVerificationResult> results = ((VerifiableProcessor)
runner.getProcessor())
+ .verify(runner.getProcessContext(), runner.getLogger(),
Collections.emptyMap());
+ assertEquals(3, results.size());
+ assertEquals(Outcome.SUCCESSFUL, results.get(0).getOutcome());
+ assertEquals(Outcome.SUCCESSFUL, results.get(1).getOutcome());
+ assertEquals(Outcome.FAILED, results.get(2).getOutcome());
+ }
+
+ private void assertVerificationResults(final TestRunner runner, final int
expectedTotalCount, final int expectedJsonDocumentCount) {
+ final List<ConfigVerificationResult> results = ((VerifiableProcessor)
runner.getProcessor())
+ .verify(runner.getProcessContext(), runner.getLogger(),
Collections.emptyMap());
+ assertEquals(3, results.size());
+ results.forEach(result -> assertEquals(Outcome.SUCCESSFUL,
result.getOutcome()));
+ assertTrue(results.get(2).getExplanation().contains("retrieved " +
expectedTotalCount + " items"));
+
assertTrue(results.get(2).getExplanation().contains(expectedJsonDocumentCount +
" JSON"));
+ }
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
index bbd341c..e98fc55 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.Collections;
import java.util.List;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -79,6 +81,11 @@ public class ITPutGetDeleteGetDynamoDBTest extends
ITAbstractDynamoDBTest {
assertEquals(document, new String(flowFile.toByteArray()));
}
+ final GetDynamoDB getDynamoDB = (GetDynamoDB) getRunner.getProcessor();
+ final List<ConfigVerificationResult> results =
getDynamoDB.verify(getRunner.getProcessContext(), getRunner.getLogger(),
Collections.emptyMap());
+ assertEquals(2, results.size());
+ assertEquals("Successfully retrieved 1 items, including 1 JSON
documents, from DynamoDB", results.get(1).getExplanation());
+
final TestRunner deleteRunner =
TestRunners.newTestRunner(DeleteDynamoDB.class);
deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE,
CREDENTIALS_FILE);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 25a1e4d..71915a1 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -22,9 +22,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -45,6 +48,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestFetchS3Object {
@@ -56,12 +62,17 @@ public class TestFetchS3Object {
@Before
public void setUp() {
- mockS3Client = Mockito.mock(AmazonS3Client.class);
+ mockS3Client = mock(AmazonS3Client.class);
mockFetchS3Object = new FetchS3Object() {
protected AmazonS3Client getClient() {
actualS3Client = client;
return mockS3Client;
}
+
+ @Override
+ protected AbstractAWSProcessor<AmazonS3Client>.AWSConfiguration
getConfiguration(ProcessContext context) {
+ return new AWSConfiguration(mockS3Client, null);
+ }
};
runner = TestRunners.newTestRunner(mockFetchS3Object);
}
@@ -90,12 +101,21 @@ public class TestFetchS3Object {
userMetadata.put("userKey2", "userValue2");
metadata.setUserMetadata(userMetadata);
metadata.setSSEAlgorithm("testAlgorithm");
- Mockito.when(metadata.getETag()).thenReturn("test-etag");
+ when(metadata.getETag()).thenReturn("test-etag");
s3ObjectResponse.setObjectMetadata(metadata);
-
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+
when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+
+ final long mockSize = 20L;
+ final ObjectMetadata objectMetadata = mock(ObjectMetadata.class);
+ when(objectMetadata.getContentLength()).thenReturn(mockSize);
+ when(mockS3Client.getObjectMetadata(any())).thenReturn(objectMetadata);
runner.run(1);
+ final List<ConfigVerificationResult> results =
mockFetchS3Object.verify(runner.getProcessContext(), runner.getLogger(), attrs);
+ assertEquals(2, results.size());
+ results.forEach(result ->
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, result.getOutcome()));
+
ArgumentCaptor<GetObjectRequest> captureRequest =
ArgumentCaptor.forClass(GetObjectRequest.class);
Mockito.verify(mockS3Client,
Mockito.times(1)).getObject(captureRequest.capture());
GetObjectRequest request = captureRequest.getValue();
@@ -148,9 +168,9 @@ public class TestFetchS3Object {
userMetadata.put("userKey2", "userValue2");
metadata.setUserMetadata(userMetadata);
metadata.setSSEAlgorithm("testAlgorithm");
- Mockito.when(metadata.getETag()).thenReturn("test-etag");
+ when(metadata.getETag()).thenReturn("test-etag");
s3ObjectResponse.setObjectMetadata(metadata);
-
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+
when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
runner.run(1);
@@ -196,9 +216,9 @@ public class TestFetchS3Object {
s3ObjectResponse.setObjectContent(new StringInputStream("Some
Content"));
ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
metadata.setContentDisposition("key/path/to/file.txt");
- Mockito.when(metadata.getVersionId()).thenReturn("response-version");
+ when(metadata.getVersionId()).thenReturn("response-version");
s3ObjectResponse.setObjectMetadata(metadata);
-
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+
when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
runner.run(1);
@@ -242,7 +262,7 @@ public class TestFetchS3Object {
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "request-key");
runner.enqueue(new byte[0], attrs);
- Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
+ when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
runner.run(1);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 2942f23..275f0cc 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -128,7 +128,9 @@ public class TestListS3 {
final List<ConfigVerificationResult> results = ((VerifiableProcessor)
runner.getProcessor())
.verify(runner.getProcessContext(), runner.getLogger(),
Collections.emptyMap());
- assertTrue(results.get(0).getExplanation().contains("finding 3
objects"));
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
results.get(0).getOutcome());
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
results.get(1).getOutcome());
+ assertTrue(results.get(1).getExplanation().contains("finding 3
objects"));
}
@Test
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
index 1179671..76a5bf9 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
@@ -16,17 +16,9 @@
*/
package org.apache.nifi.processors.aws.wag;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.times;
-
import com.amazonaws.ClientConfiguration;
import com.amazonaws.http.AmazonHttpClient;
import com.amazonaws.http.apache.client.impl.SdkHttpClient;
-import java.io.ByteArrayInputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.client.methods.HttpUriRequest;
@@ -43,6 +35,15 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.times;
+
public class TestInvokeAmazonGatewayApiMock {
private TestRunner runner = null;