This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new e97431f060 NIFI-12936 ListGCSBucket resets its tracking state after 
configuration change
e97431f060 is described below

commit e97431f0604fccad84c79048758eed567db117e1
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Mar 25 19:19:49 2024 +0100

    NIFI-12936 ListGCSBucket resets its tracking state after configuration 
change
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #8563.
---
 .../nifi/processors/gcp/storage/ListGCSBucket.java |  63 +++-
 .../processors/gcp/storage/ListGCSBucketTest.java  | 409 ++++++++++++---------
 2 files changed, 281 insertions(+), 191 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 627f66654d..be18311356 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -69,6 +69,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -274,40 +275,64 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         return relationships;
     }
 
-    // State tracking
+    private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(
+                    BUCKET,
+                    PREFIX,
+                    LISTING_STRATEGY
+            ))
+    );
+
+    // used by Tracking Timestamps tracking strategy
     public static final String CURRENT_TIMESTAMP = "currentTimestamp";
     public static final String CURRENT_KEY_PREFIX = "key-";
     private volatile long currentTimestamp = 0L;
     private final Set<String> currentKeys = Collections.synchronizedSet(new 
HashSet<>());
 
-    private volatile boolean justElectedPrimaryNode = false;
-    private volatile boolean resetEntityTrackingState = false;
+    // used by Tracking Entities tracking strategy
     private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
 
+    private volatile boolean justElectedPrimaryNode = false;
+    private volatile boolean resetTracking = false;
+
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
         justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (isConfigurationRestored() && 
TRACKING_RESET_PROPERTIES.contains(descriptor)) {
+            resetTracking = true;
+        }
+    }
+
     @OnScheduled
-    public void initListedEntityTracker(ProcessContext context) {
-        final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
-        if (listedEntityTracker != null && (resetEntityTrackingState || 
!isTrackingEntityStrategy)) {
+    public void initTrackingStrategy(ProcessContext context) throws 
IOException {
+        final String listingStrategy = 
context.getProperty(LISTING_STRATEGY).getValue();
+        final boolean isTrackingTimestampsStrategy = 
BY_TIMESTAMPS.getValue().equals(listingStrategy);
+        final boolean isTrackingEntityStrategy = 
BY_ENTITIES.getValue().equals(listingStrategy);
+
+        if (resetTracking || !isTrackingTimestampsStrategy) {
+            context.getStateManager().clear(Scope.CLUSTER);
+            currentTimestamp = 0L;
+            currentKeys.clear();
+        }
+
+        if (listedEntityTracker != null && (resetTracking || 
!isTrackingEntityStrategy)) {
             try {
                 listedEntityTracker.clearListedEntities();
+                listedEntityTracker = null;
             } catch (IOException e) {
                 throw new RuntimeException("Failed to reset previously listed 
entities", e);
             }
         }
-        resetEntityTrackingState = false;
 
-        if (isTrackingEntityStrategy) {
-            if (listedEntityTracker == null) {
-                listedEntityTracker = createListedEntityTracker();
-            }
-        } else {
-            listedEntityTracker = null;
+        if (isTrackingEntityStrategy && listedEntityTracker == null) {
+            listedEntityTracker = createListedEntityTracker();
         }
+
+        resetTracking = false;
     }
 
     protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
@@ -1026,4 +1051,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             return count;
         }
     }
+
+    long getCurrentTimestamp() {
+        return currentTimestamp;
+    }
+
+    ListedEntityTracker<ListableBlob> getListedEntityTracker() {
+        return listedEntityTracker;
+    }
+
+    boolean isResetTracking() {
+        return resetTracking;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index e7999ccefc..b0b19ac17e 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -17,19 +17,24 @@
 package org.apache.nifi.processors.gcp.storage;
 
 import com.google.api.gax.paging.Page;
+import com.google.cloud.PageImpl;
 import com.google.cloud.storage.Acl;
 import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.BlobInfo;
 import com.google.cloud.storage.Storage;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.LogMessage;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
@@ -69,17 +74,20 @@ import static 
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
  * Unit tests for {@link ListGCSBucket} which do not consume Google Cloud 
resources.
  */
 public class ListGCSBucketTest extends AbstractGCSTest {
+
     private static final String PREFIX = "test-prefix";
     private static final Boolean USE_GENERATIONS = true;
 
@@ -106,16 +114,29 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     private static final Long CREATE_TIME = 1234L;
     private static final Long UPDATE_TIME = 4567L;
     private final static Long GENERATION = 5L;
+    private static final long TIMESTAMP = 1234567890;
 
     @Mock
     Storage storage;
 
+    @Mock
+    Page<Blob> mockBlobPage;
+
     @Captor
     ArgumentCaptor<Storage.BlobListOption> argumentCaptor;
 
-    @Override
-    public ListGCSBucket getProcessor() {
-        return new ListGCSBucket() {
+    private TestRunner runner;
+
+    private ListGCSBucket processor;
+
+    private MockStateManager mockStateManager;
+
+    @Mock
+    private DistributedMapCacheClient mockCache;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        processor = new ListGCSBucket() {
             @Override
             protected Storage getCloudService() {
                 return storage;
@@ -126,21 +147,25 @@ public class ListGCSBucketTest extends AbstractGCSTest {
                 return storage;
             }
         };
+
+        runner = buildNewRunner(processor);
+        runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
+        runner.assertValid();
+
+        mockStateManager = runner.getStateManager();
+    }
+
+    @Override
+    public ListGCSBucket getProcessor() {
+        return processor;
     }
 
     @Override
     protected void addRequiredPropertiesToRunner(TestRunner runner) {
-        runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
     }
 
     @Test
     public void testRestoreFreshState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         assertEquals(-1L, 
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion(),
 "Cluster StateMap should be fresh (version -1L)");
         assertTrue(processor.getStateKeys().isEmpty());
 
@@ -150,17 +175,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals(0L, processor.getStateTimestamp());
 
         assertTrue(processor.getStateKeys().isEmpty());
-
     }
 
     @Test
     public void testRestorePreviousState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0");
@@ -181,12 +199,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testPersistState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         assertEquals(-1L,
                 
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion(),
                 "Cluster StateMap should be fresh (version -1L)"
@@ -208,13 +220,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testFailedPersistState() throws Exception {
-        reset(storage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testFailedPersistState() {
         runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true);
 
         final Set<String> keys = new HashSet<>(Arrays.asList("test-key-0", 
"test-key-1"));
@@ -234,52 +240,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
         // We could do more specific things like check the contents of the 
LogMessage,
         // but that seems too nitpicky.
-
-    }
-
-    @Mock
-    Page<Blob> mockBlobPage;
-
-    private Blob buildMockBlob(String bucket, String key, long updateTime) {
-        final Blob blob = mock(Blob.class);
-        when(blob.getBucket()).thenReturn(bucket);
-        when(blob.getName()).thenReturn(key);
-        when(blob.getUpdateTime()).thenReturn(updateTime);
-        return blob;
-    }
-
-    private Blob buildMockBlobWithoutBucket(String bucket, String key, long 
updateTime) {
-        final Blob blob = mock(Blob.class);
-        when(blob.getName()).thenReturn(key);
-        when(blob.getUpdateTime()).thenReturn(updateTime);
-        return blob;
-    }
-
-    private void verifyConfigVerification(final TestRunner runner, final 
ListGCSBucket processor, final int expectedCount) {
-        final List<ConfigVerificationResult> verificationResults = 
processor.verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
-        assertEquals(3, verificationResults.size());
-        final ConfigVerificationResult cloudServiceResult = 
verificationResults.get(0);
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
cloudServiceResult.getOutcome());
-
-        final ConfigVerificationResult iamPermissionsResult = 
verificationResults.get(1);
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
iamPermissionsResult.getOutcome());
-
-        final ConfigVerificationResult listingResult = 
verificationResults.get(2);
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
listingResult.getOutcome());
-
-        assertTrue(
-                
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", 
expectedCount)),
-                String.format("Expected %s blobs to be counted, but 
explanation was: %s", expectedCount, listingResult.getExplanation()));
     }
 
     @Test
-    public void testSuccessfulList() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testSuccessfulList() {
         final Iterable<Blob> mockList = Arrays.asList(
                 buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
                 buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
@@ -316,11 +280,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testNoTrackingListing() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
+    public void testNoTrackingListing() {
         runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
         runner.assertValid();
 
@@ -366,12 +326,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testOldValues() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Iterable<Blob> mockList = Collections.singletonList(
                 buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
         );
@@ -394,16 +348,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("2", 
runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
     }
 
-
-
     @Test
     public void testEmptyList() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Iterable<Blob> mockList = Collections.emptyList();
 
         when(mockBlobPage.getValues()).thenReturn(mockList);
@@ -423,12 +369,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndFilesComingInAlphabeticalOrder() throws 
Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
@@ -467,12 +407,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws 
Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -516,12 +450,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndNewFilesComingWithTheSameTimestamp() 
throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -571,12 +499,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
 
     @Test
     public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp() 
throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
         final Map<String, String> state = new LinkedHashMap<>();
         state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
         state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -624,13 +546,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testAttributesSet() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAttributesSet() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         when(blob.getSize()).thenReturn(SIZE);
         when(blob.getCacheControl()).thenReturn(CACHE_CONTROL);
@@ -690,13 +606,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testAclOwnerUser() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerUser() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.User mockUser = mock(Acl.User.class);
         when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
@@ -719,15 +629,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("user", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
     @Test
-    public void testAclOwnerGroup() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerGroup() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.Group mockGroup = mock(Acl.Group.class);
         when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
@@ -750,16 +653,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("group", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
-
     @Test
-    public void testAclOwnerDomain() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerDomain() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.Domain mockDomain = mock(Acl.Domain.class);
         when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
@@ -781,16 +676,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("domain", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
-
     @Test
-    public void testAclOwnerProject() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testAclOwnerProject() {
         final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
         final Acl.Project mockProject = mock(Acl.Project.class);
         when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
@@ -813,15 +700,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals("project", flowFile.getAttribute(OWNER_TYPE_ATTR));
     }
 
-
     @Test
-    public void testYieldOnBadStateRestore() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-        runner.assertValid();
-
+    public void testYieldOnBadStateRestore() {
         final Iterable<Blob> mockList = Collections.emptyList();
 
         runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
@@ -833,12 +713,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
     }
 
     @Test
-    public void testListOptionsPrefix() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-
+    public void testListOptionsPrefix() {
         runner.setProperty(ListGCSBucket.PREFIX, PREFIX);
         runner.assertValid();
 
@@ -854,14 +729,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         assertEquals(Storage.BlobListOption.prefix(PREFIX), 
argumentCaptor.getValue());
     }
 
-
     @Test
-    public void testListOptionsVersions() throws Exception {
-        reset(storage, mockBlobPage);
-        final ListGCSBucket processor = getProcessor();
-        final TestRunner runner = buildNewRunner(processor);
-        addRequiredPropertiesToRunner(runner);
-
+    public void testListOptionsVersions() {
         runner.setProperty(ListGCSBucket.USE_GENERATIONS, 
String.valueOf(USE_GENERATIONS));
         runner.assertValid();
 
@@ -877,4 +746,188 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         Storage.BlobListOption option = argumentCaptor.getValue();
         assertEquals(Storage.BlobListOption.versions(true), option);
     }
+
+    @Test
+    void testResetTimestampTrackingWhenBucketModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+        runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, processor.getCurrentTimestamp());
+        mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetTimestampTrackingWhenPrefixModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+        runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, processor.getCurrentTimestamp());
+        mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetTimestampTrackingWhenStrategyModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertEquals(0, processor.getCurrentTimestamp());
+        mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP, 
Scope.CLUSTER);
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenBucketModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+
+        runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
processor.getIdentifier()), any());
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenPrefixModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+
+        runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
processor.getIdentifier()), any());
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    @Test
+    void testResetEntityTrackingWhenStrategyModified() throws Exception {
+        setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+        assertFalse(processor.isResetTracking());
+
+        runner.run();
+
+        assertNotNull(processor.getListedEntityTracker());
+
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, 
ListGCSBucket.NO_TRACKING);
+
+        assertTrue(processor.isResetTracking());
+
+        runner.run();
+
+        assertNull(processor.getListedEntityTracker());
+        verify(mockCache).remove(eq("ListedEntities::" + 
processor.getIdentifier()), any());
+
+        assertFalse(processor.isResetTracking());
+    }
+
+    private void setUpResetTrackingTest(AllowableValue listingStrategy) throws 
Exception {
+        runner.setProperty(ListGCSBucket.LISTING_STRATEGY, listingStrategy);
+        runner.setProperty(ListGCSBucket.PREFIX, "prefix1");
+
+        if (listingStrategy == ListGCSBucket.BY_TIMESTAMPS) {
+            Map<String, String> map = new HashMap<>();
+            map.put(ListGCSBucket.CURRENT_TIMESTAMP, Long.toString(TIMESTAMP));
+            map.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "file");
+            mockStateManager.setState(map, Scope.CLUSTER);
+        } else if (listingStrategy == ListGCSBucket.BY_ENTITIES) {
+            String serviceId = "DistributedMapCacheClient";
+            when(mockCache.getIdentifier()).thenReturn(serviceId);
+            runner.addControllerService(serviceId, mockCache);
+            runner.enableControllerService(mockCache);
+            runner.setProperty(ListGCSBucket.TRACKING_STATE_CACHE, serviceId);
+        }
+
+        when(storage.list(anyString(), 
any(Storage.BlobListOption.class))).thenReturn(new PageImpl<>(null, null, 
null));
+    }
+
+    private Blob buildMockBlob(final String bucket, final String key, final 
long updateTime) {
+        final Blob blob = mock(Blob.class);
+        when(blob.getBucket()).thenReturn(bucket);
+        when(blob.getName()).thenReturn(key);
+        when(blob.getUpdateTime()).thenReturn(updateTime);
+        return blob;
+    }
+
+    private Blob buildMockBlobWithoutBucket(final String bucket, final String 
key, final long updateTime) {
+        final Blob blob = mock(Blob.class);
+        when(blob.getName()).thenReturn(key);
+        when(blob.getUpdateTime()).thenReturn(updateTime);
+        return blob;
+    }
+
+    private void verifyConfigVerification(final TestRunner runner, final 
ListGCSBucket processor, final int expectedCount) {
+        final List<ConfigVerificationResult> verificationResults = 
processor.verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
+        assertEquals(3, verificationResults.size());
+        final ConfigVerificationResult cloudServiceResult = 
verificationResults.get(0);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
cloudServiceResult.getOutcome());
+
+        final ConfigVerificationResult iamPermissionsResult = 
verificationResults.get(1);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
iamPermissionsResult.getOutcome());
+
+        final ConfigVerificationResult listingResult = 
verificationResults.get(2);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
listingResult.getOutcome());
+
+        assertTrue(
+                
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", 
expectedCount)),
+                String.format("Expected %s blobs to be counted, but 
explanation was: %s", expectedCount, listingResult.getExplanation()));
+    }
 }
\ No newline at end of file

Reply via email to