This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ca530f4 NIFI-9317: Updating config verification for ListS3 (#5485)
ca530f4 is described below
commit ca530f40d8ef25a7c6834c96de982accb75368ee
Author: Joe Gresock <[email protected]>
AuthorDate: Wed Oct 27 13:47:58 2021 -0400
NIFI-9317: Updating config verification for ListS3 (#5485)
---
.../nifi/processors/aws/AbstractAWSProcessor.java | 22 ++++--
.../org/apache/nifi/processors/aws/s3/ListS3.java | 92 +++++++++++++++-------
.../apache/nifi/processors/aws/s3/TestListS3.java | 15 ++++
3 files changed, 92 insertions(+), 37 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 36e6a59..6609beb 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -303,17 +303,24 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
public abstract void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException;
protected void initializeRegionAndEndpoint(final ProcessContext context,
final AmazonWebServiceClient client) {
+ this.region = getRegionAndInitializeEndpoint(context, client);
+ }
+
+ protected Region getRegionAndInitializeEndpoint(final ProcessContext
context, final AmazonWebServiceClient client) {
+ final Region region;
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
- final String region = context.getProperty(REGION).getValue();
- if (region != null) {
- this.region = Region.getRegion(Regions.fromName(region));
+ final String regionValue = context.getProperty(REGION).getValue();
+ if (regionValue != null) {
+ region = Region.getRegion(Regions.fromName(regionValue));
if (client != null) {
- client.setRegion(this.region);
+ client.setRegion(region);
}
} else {
- this.region = null;
+ region = null;
}
+ } else {
+ region = null;
}
// if the endpoint override has been configured, set the endpoint.
@@ -328,8 +335,8 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
// handling vpce endpoints
// falling back to the configured region if the parse fails
// e.g. in case of
https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
- String region = parseRegionForVPCE(urlstr,
this.region.getName());
- client.setEndpoint(urlstr, this.client.getServiceName(),
region);
+ String regionValue = parseRegionForVPCE(urlstr,
region.getName());
+ client.setEndpoint(urlstr, this.client.getServiceName(),
regionValue);
} else {
// handling non-vpce custom endpoints where the AWS
library can parse the region out
// e.g. https://sqs.{region}.***.***.***.gov
@@ -337,6 +344,7 @@ public abstract class AbstractAWSProcessor<ClientType
extends AmazonWebServiceCl
}
}
}
+ return region;
}
/*
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 8ab51fe..5446650 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -53,6 +53,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -342,41 +343,23 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
return;
}
+ final AmazonS3 client = getClient();
+
+ S3BucketLister bucketLister = getS3BucketLister(context, client);
+
final long startNanos = System.nanoTime();
- final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
final long listingTimestamp = System.currentTimeMillis();
- final boolean requesterPays =
context.getProperty(REQUESTER_PAYS).asBoolean();
+
+ final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ListingSnapshot currentListing = listing.get();
final long currentTimestamp = currentListing.getTimestamp();
final Set<String> currentKeys = currentListing.getKeys();
-
- final AmazonS3 client = getClient();
int listCount = 0;
int totalListCount = 0;
long latestListedTimestampInThisCycle = currentTimestamp;
- String delimiter = context.getProperty(DELIMITER).getValue();
- String prefix =
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
-
- boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
- int listType = context.getProperty(LIST_TYPE).asInteger();
- S3BucketLister bucketLister = useVersions
- ? new S3VersionBucketLister(client)
- : listType == 2
- ? new S3ObjectBucketListerVersion2(client)
- : new S3ObjectBucketLister(client);
-
- bucketLister.setBucketName(bucket);
- bucketLister.setRequesterPays(requesterPays);
-
- if (delimiter != null && !delimiter.isEmpty()) {
- bucketLister.setDelimiter(delimiter);
- }
- if (prefix != null && !prefix.isEmpty()) {
- bucketLister.setPrefix(prefix);
- }
VersionListing versionListing;
final Set<String> listedKeys = new HashSet<>();
@@ -486,6 +469,33 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
}
}
+ private S3BucketLister getS3BucketLister(final ProcessContext context,
final AmazonS3 client) {
+ final boolean requesterPays =
context.getProperty(REQUESTER_PAYS).asBoolean();
+ final boolean useVersions =
context.getProperty(USE_VERSIONS).asBoolean();
+
+ final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final String delimiter = context.getProperty(DELIMITER).getValue();
+ final String prefix =
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+
+ final int listType = context.getProperty(LIST_TYPE).asInteger();
+
+ final S3BucketLister bucketLister = useVersions
+ ? new S3VersionBucketLister(client)
+ : listType == 2
+ ? new S3ObjectBucketListerVersion2(client)
+ : new S3ObjectBucketLister(client);
+
+ bucketLister.setBucketName(bucket);
+ bucketLister.setRequesterPays(requesterPays);
+
+ if (delimiter != null && !delimiter.isEmpty()) {
+ bucketLister.setDelimiter(delimiter);
+ }
+ if (prefix != null && !prefix.isEmpty()) {
+ bucketLister.setPrefix(prefix);
+ }
+ return bucketLister;
+ }
private interface S3BucketLister {
void setBucketName(String bucketName);
@@ -891,11 +901,15 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog logger, final Map<String, String> attributes) {
- final AmazonS3Client client = createClient(context,
getCredentials(context), createConfiguration(context));
- initializeRegionAndEndpoint(context, client);
+ final ControllerService service =
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+ final AmazonS3Client client = service != null ? createClient(context,
getCredentialsProvider(context), createConfiguration(context))
+ : createClient(context, getCredentials(context),
createConfiguration(context));
+
+ getRegionAndInitializeEndpoint(context, client);
final List<ConfigVerificationResult> results = new ArrayList<>();
final String bucketName =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@@ -907,17 +921,35 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
return results;
}
- final String prefix = context.getProperty(PREFIX).getValue();
+ final S3BucketLister bucketLister = getS3BucketLister(context, client);
+ final long listingTimestamp = System.currentTimeMillis();
// Attempt to perform a listing of objects in the S3 bucket
try {
- final ObjectListing listing = client.listObjects(bucketName,
prefix);
- final int count = listing.getObjectSummaries().size();
+ int listCount = 0;
+ int totalListCount = 0;
+ VersionListing versionListing;
+ do {
+ versionListing = bucketLister.listVersions();
+ for (final S3VersionSummary versionSummary :
versionListing.getVersionSummaries()) {
+ long lastModified =
versionSummary.getLastModified().getTime();
+ if (lastModified > (listingTimestamp -
minAgeMilliseconds)) {
+ continue;
+ }
+
+ listCount++;
+ }
+ bucketLister.setNextMarker();
+
+ totalListCount += listCount;
+
+ listCount = 0;
+ } while (bucketLister.isTruncated());
results.add(new ConfigVerificationResult.Builder()
.verificationStepName("Perform Listing")
.outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully listed contents of bucket '" +
bucketName + "', finding " + count + " objects" + (prefix == null ? "" : " with
a prefix of '" + prefix + "'"))
+ .explanation("Successfully listed contents of bucket '" +
bucketName + "', finding " + totalListCount + " objects matching the filter")
.build());
logger.info("Successfully verified configuration");
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index a05c9e3..2942f23 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.aws.s3;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@@ -28,7 +30,10 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.state.MockStateManager;
@@ -44,6 +49,7 @@ import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -66,6 +72,11 @@ public class TestListS3 {
protected AmazonS3Client getClient() {
return mockS3Client;
}
+
+ @Override
+ protected AmazonS3Client createClient(ProcessContext context,
AWSCredentials credentials, ClientConfiguration config) {
+ return mockS3Client;
+ }
};
runner = TestRunners.newTestRunner(mockListS3);
}
@@ -114,6 +125,10 @@ public class TestListS3 {
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
flowFiles.get(2).assertAttributeEquals("filename", "d/e");
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP,
lastModifiedTimestamp, Scope.CLUSTER);
+
+ final List<ConfigVerificationResult> results = ((VerifiableProcessor)
runner.getProcessor())
+ .verify(runner.getProcessContext(), runner.getLogger(),
Collections.emptyMap());
+ assertTrue(results.get(0).getExplanation().contains("finding 3
objects"));
}
@Test