This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 6ce2e3799c NIFI-10349 add maximum object age property to list s3
6ce2e3799c is described below
commit 6ce2e3799c7525ccbd1112cfcfc66912640da5f3
Author: Marco Carlino <[email protected]>
AuthorDate: Thu Aug 11 17:31:21 2022 +0200
NIFI-10349 add maximum object age property to list s3
This closes #6293.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../org/apache/nifi/processors/aws/s3/ListS3.java | 33 ++++++++++++++-
.../apache/nifi/processors/aws/s3/TestListS3.java | 48 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 282cd0c354..d492708606 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -81,6 +81,7 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.FormatUtils;
import java.io.IOException;
import java.io.OutputStream;
@@ -220,6 +221,15 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
.defaultValue("0 sec")
.build();
+ public static final PropertyDescriptor MAX_AGE = new Builder()
+ .name("max-age")
+ .displayName("Maximum Object Age")
+ .description("The maximum age that an S3 object can be in order to
be considered; any object older than this amount of time (according to last
modification date) will be ignored")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .addValidator(createMaxAgeValidator())
+ .build();
+
public static final PropertyDescriptor WRITE_OBJECT_TAGS = new Builder()
.name("write-s3-object-tags")
.displayName("Write Object Tags")
@@ -284,6 +294,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
SECRET_KEY,
RECORD_WRITER,
MIN_AGE,
+ MAX_AGE,
BATCH_SIZE,
WRITE_OBJECT_TAGS,
WRITE_USER_METADATA,
@@ -362,6 +373,22 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
}
};
}
+ private static Validator createMaxAgeValidator() {
+ return new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final
String input, final ValidationContext context) {
+ Double maxAge = input != null ?
FormatUtils.getPreciseTimeDuration(input, TimeUnit.MILLISECONDS) : null;
+ long minAge =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ boolean valid = input != null && maxAge > minAge;
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(valid)
+ .explanation(valid ? null : "'Maximum Age' must be
greater than 'Minimum Age' ")
+ .build();
+ }
+ };
+ }
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -449,6 +476,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
final long startNanos = System.nanoTime();
final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
final long listingTimestamp = System.currentTimeMillis();
final String bucket =
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
@@ -481,6 +509,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
long lastModified =
versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp &&
currentKeys.contains(versionSummary.getKey())
+ || (maxAgeMilliseconds != null && (lastModified <
(listingTimestamp - maxAgeMilliseconds)))
|| lastModified > (listingTimestamp -
minAgeMilliseconds)) {
continue;
}
@@ -1103,6 +1132,7 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
final List<ConfigVerificationResult> results = new
ArrayList<>(super.verify(context, logger, attributes));
final String bucketName =
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
final long minAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
if (bucketName == null || bucketName.trim().isEmpty()) {
results.add(new ConfigVerificationResult.Builder()
@@ -1126,7 +1156,8 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
versionListing = bucketLister.listVersions();
for (final S3VersionSummary versionSummary :
versionListing.getVersionSummaries()) {
long lastModified =
versionSummary.getLastModified().getTime();
- if (lastModified > (listingTimestamp -
minAgeMilliseconds)) {
+ if ((maxAgeMilliseconds != null && (lastModified <
(listingTimestamp - maxAgeMilliseconds)))
+ || lastModified > (listingTimestamp - minAgeMilliseconds))
{
continue;
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 410cf9370f..01e259513c 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -463,6 +463,54 @@ public class TestListS3 {
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP,
lastModifiedTimestamp, Scope.CLUSTER);
}
+ @Test
+ public void testListIgnoreByMaxAge() throws IOException {
+ runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(ListS3.BUCKET, "test-bucket");
+ runner.setProperty(ListS3.MAX_AGE, "30 sec");
+ Date lastModifiedNow = new Date();
+ Date lastModifiedMinus1Hour = DateUtils.addHours(lastModifiedNow, -1);
+ Date lastModifiedMinus3Hour = DateUtils.addHours(lastModifiedNow, -3);
+ ObjectListing objectListing = new ObjectListing();
+ S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+ objectSummary1.setBucketName("test-bucket");
+ objectSummary1.setKey("minus-3hour");
+ objectSummary1.setLastModified(lastModifiedMinus3Hour);
+ objectListing.getObjectSummaries().add(objectSummary1);
+ S3ObjectSummary objectSummary2 = new S3ObjectSummary();
+ objectSummary2.setBucketName("test-bucket");
+ objectSummary2.setKey("minus-1hour");
+ objectSummary2.setLastModified(lastModifiedMinus1Hour);
+ objectListing.getObjectSummaries().add(objectSummary2);
+ S3ObjectSummary objectSummary3 = new S3ObjectSummary();
+ objectSummary3.setBucketName("test-bucket");
+ objectSummary3.setKey("now");
+ objectSummary3.setLastModified(lastModifiedNow);
+ objectListing.getObjectSummaries().add(objectSummary3);
+
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+ Map<String,String> stateMap = new HashMap<>();
+ String previousTimestamp =
String.valueOf(lastModifiedMinus3Hour.getTime());
+ stateMap.put(ListS3.CURRENT_TIMESTAMP, previousTimestamp);
+ stateMap.put(ListS3.CURRENT_KEY_PREFIX + "0", "minus-3hour");
+ runner.getStateManager().setState(stateMap, Scope.CLUSTER);
+ runner.run();
+ ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
+ Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ ListObjectsRequest request = captureRequest.getValue();
+ assertEquals("test-bucket", request.getBucketName());
+ Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+
+ runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+ MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeEquals("filename", "now");
+ ff0.assertAttributeEquals("s3.bucket", "test-bucket");
+ String lastModifiedTimestamp =
String.valueOf(lastModifiedNow.getTime());
+ ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
+ runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP,
lastModifiedTimestamp, Scope.CLUSTER);
+ }
+
@Test
public void testWriteObjectTags() {
runner.setProperty(ListS3.REGION, "eu-west-1");