This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ce2e3799c NIFI-10349 add maximum object age property to list s3
6ce2e3799c is described below

commit 6ce2e3799c7525ccbd1112cfcfc66912640da5f3
Author: Marco Carlino <[email protected]>
AuthorDate: Thu Aug 11 17:31:21 2022 +0200

    NIFI-10349 add maximum object age property to list s3
    
    This closes #6293.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../org/apache/nifi/processors/aws/s3/ListS3.java  | 33 ++++++++++++++-
 .../apache/nifi/processors/aws/s3/TestListS3.java  | 48 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 282cd0c354..d492708606 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -81,6 +81,7 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.FormatUtils;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -220,6 +221,15 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
             .defaultValue("0 sec")
             .build();
 
+    public static final PropertyDescriptor MAX_AGE = new Builder()
+            .name("max-age")
+            .displayName("Maximum Object Age")
+            .description("The maximum age that an S3 object can be in order to 
be considered; any object older than this amount of time (according to last 
modification date) will be ignored")
+            .required(false)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .addValidator(createMaxAgeValidator())
+            .build();
+
     public static final PropertyDescriptor WRITE_OBJECT_TAGS = new Builder()
             .name("write-s3-object-tags")
             .displayName("Write Object Tags")
@@ -284,6 +294,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
         SECRET_KEY,
         RECORD_WRITER,
         MIN_AGE,
+        MAX_AGE,
         BATCH_SIZE,
         WRITE_OBJECT_TAGS,
         WRITE_USER_METADATA,
@@ -362,6 +373,22 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
             }
         };
     }
+    private static Validator createMaxAgeValidator() {
+        return new Validator() {
+            @Override
+            public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+                Double  maxAge = input != null ? 
FormatUtils.getPreciseTimeDuration(input, TimeUnit.MILLISECONDS) : null;
+                long minAge = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+                boolean valid = input != null && maxAge > minAge;
+                return new ValidationResult.Builder()
+                        .input(input)
+                        .subject(subject)
+                        .valid(valid)
+                        .explanation(valid ? null : "'Maximum Age' must be 
greater than 'Minimum Age' ")
+                        .build();
+            }
+        };
+    }
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -449,6 +476,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
 
         final long startNanos = System.nanoTime();
         final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
         final long listingTimestamp = System.currentTimeMillis();
 
         final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
@@ -481,6 +509,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
                     long lastModified = 
versionSummary.getLastModified().getTime();
                     if (lastModified < currentTimestamp
                         || lastModified == currentTimestamp && 
currentKeys.contains(versionSummary.getKey())
+                        || (maxAgeMilliseconds != null && (lastModified < 
(listingTimestamp - maxAgeMilliseconds)))
                         || lastModified > (listingTimestamp - 
minAgeMilliseconds)) {
                         continue;
                     }
@@ -1103,6 +1132,7 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         final List<ConfigVerificationResult> results = new 
ArrayList<>(super.verify(context, logger, attributes));
         final String bucketName = 
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
         final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
 
         if (bucketName == null || bucketName.trim().isEmpty()) {
             results.add(new ConfigVerificationResult.Builder()
@@ -1126,7 +1156,8 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
                 versionListing = bucketLister.listVersions();
                 for (final S3VersionSummary versionSummary : 
versionListing.getVersionSummaries()) {
                     long lastModified = 
versionSummary.getLastModified().getTime();
-                    if (lastModified > (listingTimestamp - 
minAgeMilliseconds)) {
+                    if ((maxAgeMilliseconds != null && (lastModified < 
(listingTimestamp - maxAgeMilliseconds)))
+                    || lastModified > (listingTimestamp - minAgeMilliseconds)) 
{
                         continue;
                     }
 
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 410cf9370f..01e259513c 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -463,6 +463,54 @@ public class TestListS3 {
         runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, 
lastModifiedTimestamp, Scope.CLUSTER);
     }
 
+    @Test
+    public void testListIgnoreByMaxAge() throws IOException {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.MAX_AGE, "30 sec");
+        Date lastModifiedNow = new Date();
+        Date lastModifiedMinus1Hour = DateUtils.addHours(lastModifiedNow, -1);
+        Date lastModifiedMinus3Hour = DateUtils.addHours(lastModifiedNow, -3);
+        ObjectListing objectListing = new ObjectListing();
+        S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+        objectSummary1.setBucketName("test-bucket");
+        objectSummary1.setKey("minus-3hour");
+        objectSummary1.setLastModified(lastModifiedMinus3Hour);
+        objectListing.getObjectSummaries().add(objectSummary1);
+        S3ObjectSummary objectSummary2 = new S3ObjectSummary();
+        objectSummary2.setBucketName("test-bucket");
+        objectSummary2.setKey("minus-1hour");
+        objectSummary2.setLastModified(lastModifiedMinus1Hour);
+        objectListing.getObjectSummaries().add(objectSummary2);
+        S3ObjectSummary objectSummary3 = new S3ObjectSummary();
+        objectSummary3.setBucketName("test-bucket");
+        objectSummary3.setKey("now");
+        objectSummary3.setLastModified(lastModifiedNow);
+        objectListing.getObjectSummaries().add(objectSummary3);
+        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+        Map<String,String> stateMap = new HashMap<>();
+        String previousTimestamp = 
String.valueOf(lastModifiedMinus3Hour.getTime());
+        stateMap.put(ListS3.CURRENT_TIMESTAMP, previousTimestamp);
+        stateMap.put(ListS3.CURRENT_KEY_PREFIX + "0", "minus-3hour");
+        runner.getStateManager().setState(stateMap, Scope.CLUSTER);
+        runner.run();
+        ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        ListObjectsRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("filename", "now");
+        ff0.assertAttributeEquals("s3.bucket", "test-bucket");
+        String lastModifiedTimestamp = 
String.valueOf(lastModifiedNow.getTime());
+        ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
+        runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, 
lastModifiedTimestamp, Scope.CLUSTER);
+    }
+
     @Test
     public void testWriteObjectTags() {
         runner.setProperty(ListS3.REGION, "eu-west-1");

Reply via email to