This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 47c41ba8d7 NIFI-12732 ListS3 resets its tracking state after 
configuration change
47c41ba8d7 is described below

commit 47c41ba8d72d3f3494d42f6d4c32da53fc009a7b
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu Feb 1 13:16:42 2024 +0100

    NIFI-12732 ListS3 resets its tracking state after configuration change
    
    This closes #8360.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../org/apache/nifi/processors/aws/s3/ListS3.java  |  74 ++++--
 .../apache/nifi/processors/aws/s3/TestListS3.java  | 296 ++++++++++++++++++---
 2 files changed, 314 insertions(+), 56 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 431778f1ef..cd0de3cb2f 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -326,15 +326,27 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
 
     public static final Set<Relationship> relationships = 
Collections.singleton(REL_SUCCESS);
 
+    private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(
+                    BUCKET,
+                    REGION,
+                    PREFIX,
+                    LISTING_STRATEGY
+            ))
+    );
+
     public static final String CURRENT_TIMESTAMP = "currentTimestamp";
     public static final String CURRENT_KEY_PREFIX = "key-";
 
-    // State tracking
-    private final AtomicReference<ListingSnapshot> listing = new 
AtomicReference<>(new ListingSnapshot(0L, Collections.emptySet()));
+    // used by Tracking Timestamps tracking strategy
+    private final AtomicReference<ListingSnapshot> listing = new 
AtomicReference<>(ListingSnapshot.empty());
 
-    private volatile boolean justElectedPrimaryNode = false;
-    private volatile boolean resetEntityTrackingState = false;
+    // used by Tracking Entities tracking strategy
     private volatile 
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> 
listedEntityTracker;
+
+    private volatile boolean justElectedPrimaryNode = false;
+    private volatile boolean resetTracking = false;
+
     private volatile Long minObjectAgeMilliseconds;
     private volatile Long maxObjectAgeMilliseconds;
 
@@ -343,26 +355,42 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (isConfigurationRestored() && 
TRACKING_RESET_PROPERTIES.contains(descriptor)) {
+            resetTracking = true;
+        }
+    }
+
     @OnScheduled
-    public void initListedEntityTracker(ProcessContext context) {
-        final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
-        if (listedEntityTracker != null && (resetEntityTrackingState || 
!isTrackingEntityStrategy)) {
+    public void initTrackingStrategy(ProcessContext context) throws 
IOException {
+        final String listingStrategy = 
context.getProperty(LISTING_STRATEGY).getValue();
+        final boolean isTrackingTimestampsStrategy = 
BY_TIMESTAMPS.getValue().equals(listingStrategy);
+        final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(listingStrategy);
+
+        if (resetTracking || !isTrackingTimestampsStrategy) {
+            context.getStateManager().clear(Scope.CLUSTER);
+            listing.set(ListingSnapshot.empty());
+        }
+
+        if (listedEntityTracker != null && (resetTracking || 
!isTrackingEntityStrategy)) {
             try {
                 listedEntityTracker.clearListedEntities();
+                listedEntityTracker = null;
             } catch (IOException e) {
                 throw new RuntimeException("Failed to reset previously listed 
entities", e);
             }
         }
-        resetEntityTrackingState = false;
 
-        if (isTrackingEntityStrategy) {
-            if (listedEntityTracker == null) {
-                listedEntityTracker = createListedEntityTracker();
-            }
-        } else {
-            listedEntityTracker = null;
+        if (isTrackingEntityStrategy && listedEntityTracker == null) {
+            listedEntityTracker = createListedEntityTracker();
         }
 
+        resetTracking = false;
+    }
+
+    @OnScheduled
+    public void initObjectAgeThresholds(ProcessContext context) {
         minObjectAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         maxObjectAgeMilliseconds = context.getProperty(MAX_AGE) != null ? 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
     }
@@ -1175,7 +1203,7 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         }
     }
 
-    private static class ListingSnapshot {
+    static class ListingSnapshot {
         private final long timestamp;
         private final Set<String> keys;
 
@@ -1191,6 +1219,10 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         public Set<String> getKeys() {
             return keys;
         }
+
+        public static ListingSnapshot empty() {
+            return new ListingSnapshot(0L, Collections.emptySet());
+        }
     }
 
     @Override
@@ -1270,4 +1302,16 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         versionSummary.setIsLatest(true);
         return versionSummary;
     }
+
+    ListingSnapshot getListingSnapshot() {
+        return listing.get();
+    }
+
+    ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> 
getListedEntityTracker() {
+        return listedEntityTracker;
+    }
+
+    boolean isResetTracking() {
+        return resetTracking;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index aede9b2a55..b3228fd344 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.aws.s3;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@@ -30,8 +31,10 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.amazonaws.services.s3.model.S3VersionSummary;
 import com.amazonaws.services.s3.model.VersionListing;
 import org.apache.commons.lang3.time.DateUtils;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.reporting.InitializationException;
@@ -57,24 +60,38 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class TestListS3 {
 
-    private TestRunner runner = null;
-    private AmazonS3Client mockS3Client = null;
+    private static final long TEST_TIMESTAMP = 1234567890;
+
+    private TestRunner runner;
+    private ListS3 listS3;
+    private AmazonS3Client mockS3Client;
+    private MockStateManager mockStateManager;
+    private DistributedMapCacheClient mockCache;
 
     @BeforeEach
     public void setUp() {
-        mockS3Client = Mockito.mock(AmazonS3Client.class);
-        final ListS3 mockListS3 = new ListS3() {
+        mockS3Client = mock(AmazonS3Client.class);
+        listS3 = new ListS3() {
             @Override
             protected AmazonS3Client createClient(ProcessContext context, 
AWSCredentials credentials, ClientConfiguration config) {
                 return mockS3Client;
             }
         };
-        runner = TestRunners.newTestRunner(mockListS3);
+        runner = TestRunners.newTestRunner(listS3);
+        mockStateManager = runner.getStateManager();
+        mockCache = mock(DistributedMapCacheClient.class);
     }
 
 
@@ -100,16 +117,16 @@ public class TestListS3 {
         objectSummary3.setKey("d/e");
         objectSummary3.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
         assertFalse(request.isRequesterPays());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -157,16 +174,16 @@ public class TestListS3 {
         objectSummary3.setKey("d/e");
         objectSummary3.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
         assertFalse(request.isRequesterPays());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
 
@@ -204,16 +221,16 @@ public class TestListS3 {
         objectSummary3.setKey("d/e");
         objectSummary3.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
         assertTrue(request.isRequesterPays());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -260,16 +277,16 @@ public class TestListS3 {
         objectSummary3.setKey("d/e");
         objectSummary3.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsV2Request> captureRequest = 
ArgumentCaptor.forClass(ListObjectsV2Request.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjectsV2(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjectsV2(captureRequest.capture());
         ListObjectsV2Request request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
         assertFalse(request.isRequesterPays());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -307,16 +324,16 @@ public class TestListS3 {
         objectSummary3.setKey("d/e");
         objectSummary3.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsV2Request> captureRequest = 
ArgumentCaptor.forClass(ListObjectsV2Request.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjectsV2(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjectsV2(captureRequest.capture());
         ListObjectsV2Request request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
         assertTrue(request.isRequesterPays());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -350,15 +367,15 @@ public class TestListS3 {
         versionSummary2.setVersionId("2");
         versionSummary2.setLastModified(lastModified);
         versionListing.getVersionSummaries().add(versionSummary2);
-        
Mockito.when(mockS3Client.listVersions(Mockito.any(ListVersionsRequest.class))).thenReturn(versionListing);
+        
when(mockS3Client.listVersions(any(ListVersionsRequest.class))).thenReturn(versionListing);
 
         runner.run();
 
         ArgumentCaptor<ListVersionsRequest> captureRequest = 
ArgumentCaptor.forClass(ListVersionsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listVersions(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listVersions(captureRequest.capture());
         ListVersionsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listObjects(Mockito.any(ListObjectsRequest.class));
+        verify(mockS3Client, 
Mockito.never()).listObjects(any(ListObjectsRequest.class));
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 2);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -396,15 +413,15 @@ public class TestListS3 {
         objectSummary1.setKey("test-key");
         objectSummary1.setLastModified(objectLastModified);
         objectListing.getObjectSummaries().add(objectSummary1);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0);
     }
@@ -435,7 +452,7 @@ public class TestListS3 {
         objectSummary3.setKey("now");
         objectSummary3.setLastModified(lastModifiedNow);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         Map<String,String> stateMap = new HashMap<>();
         String previousTimestamp = 
String.valueOf(lastModifiedMinus3Hour.getTime());
@@ -446,10 +463,10 @@ public class TestListS3 {
         runner.run();
 
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -485,7 +502,7 @@ public class TestListS3 {
         objectSummary3.setKey("now");
         objectSummary3.setLastModified(lastModifiedNow);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         Map<String,String> stateMap = new HashMap<>();
         String previousTimestamp = 
String.valueOf(lastModifiedMinus3Hour.getTime());
@@ -494,10 +511,10 @@ public class TestListS3 {
         runner.getStateManager().setState(stateMap, Scope.CLUSTER);
         runner.run();
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -523,17 +540,17 @@ public class TestListS3 {
         objectSummary1.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary1);
 
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<GetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(GetObjectTaggingRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).getObjectTagging(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).getObjectTagging(captureRequest.capture());
         GetObjectTaggingRequest request = captureRequest.getValue();
 
         assertEquals("test-bucket", request.getBucketName());
         assertEquals("a", request.getKey());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
     }
 
     @Test
@@ -550,18 +567,18 @@ public class TestListS3 {
         objectSummary1.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary1);
 
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<GetObjectMetadataRequest> captureRequest = 
ArgumentCaptor.forClass(GetObjectMetadataRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
         GetObjectMetadataRequest request = captureRequest.getValue();
 
         assertEquals("test-bucket", request.getBucketName());
         assertEquals("a", request.getKey());
 
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
     }
 
     @Test
@@ -587,16 +604,16 @@ public class TestListS3 {
         objectSummary3.setKey("d/e");
         objectSummary3.setLastModified(lastModified);
         objectListing.getObjectSummaries().add(objectSummary3);
-        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
 
         runner.run();
 
         ArgumentCaptor<ListObjectsRequest> captureRequest = 
ArgumentCaptor.forClass(ListObjectsRequest.class);
-        Mockito.verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
+        verify(mockS3Client, 
Mockito.times(1)).listObjects(captureRequest.capture());
         ListObjectsRequest request = captureRequest.getValue();
         assertEquals("test-bucket", request.getBucketName());
         assertFalse(request.isRequesterPays());
-        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+        verify(mockS3Client, Mockito.never()).listVersions(any());
 
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
         List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -622,4 +639,201 @@ public class TestListS3 {
         runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
         runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP, 
null, Scope.CLUSTER);
     }
+
+    @Test
+    void testResetTimestampTrackingWhenBucketModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TEST_TIMESTAMP, 
listS3.getListingSnapshot().getTimestamp());
+
+        runner.setProperty(ListS3.BUCKET, "otherBucket");
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+        mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0", 
Scope.CLUSTER);
+
+        assertFalse(listS3.isResetTracking());
+    }
+    @Test
+    void testResetTimestampTrackingWhenRegionModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TEST_TIMESTAMP, 
listS3.getListingSnapshot().getTimestamp());
+
+        runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+        mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0", 
Scope.CLUSTER);
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    @Test
+    void testResetTimestampTrackingWhenPrefixModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TEST_TIMESTAMP, 
listS3.getListingSnapshot().getTimestamp());
+
+        runner.setProperty(ListS3.PREFIX, "prefix2");
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+        mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0", 
Scope.CLUSTER);
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    @Test
+    void testResetTimestampTrackingWhenStrategyModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TEST_TIMESTAMP, 
listS3.getListingSnapshot().getTimestamp());
+
+        runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+        mockStateManager.assertStateNotSet(ListS3.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenBucketModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+
+        runner.setProperty(ListS3.BUCKET, "otherBucket");
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
listS3.getIdentifier()), any());
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenRegionModified() throws Exception {
+
+        setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+
+        runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
listS3.getIdentifier()), any());
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenPrefixModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+
+        runner.setProperty(ListS3.PREFIX, "prefix2");
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
listS3.getIdentifier()), any());
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenStrategyModified() throws Exception {
+        setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+        assertFalse(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(listS3.getListedEntityTracker());
+
+        runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
+
+        assertTrue(listS3.isResetTracking());
+
+        runner.run();
+
+        assertNull(listS3.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
listS3.getIdentifier()), any());
+
+        assertFalse(listS3.isResetTracking());
+    }
+
+    private void setUpResetTrackingTest(AllowableValue listingStrategy) throws 
Exception {
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.LISTING_STRATEGY, listingStrategy);
+        runner.setProperty(ListS3.PREFIX, "prefix1");
+
+        if (listingStrategy == ListS3.BY_TIMESTAMPS) {
+            Map<String, String> map = new HashMap<>();
+            map.put(ListS3.CURRENT_TIMESTAMP, Long.toString(TEST_TIMESTAMP));
+            map.put(ListS3.CURRENT_KEY_PREFIX + "0", "file");
+            mockStateManager.setState(map, Scope.CLUSTER);
+        } else if (listingStrategy == ListS3.BY_ENTITIES) {
+            String serviceId = "DistributedMapCacheClient";
+            when(mockCache.getIdentifier()).thenReturn(serviceId);
+            runner.addControllerService(serviceId, mockCache);
+            runner.enableControllerService(mockCache);
+            runner.setProperty(ListS3.TRACKING_STATE_CACHE, serviceId);
+        }
+
+        
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(new 
ObjectListing());
+    }
 }

Reply via email to