This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new e97431f060 NIFI-12936 ListGCSBucket resets its tracking state after
configuration change
e97431f060 is described below
commit e97431f0604fccad84c79048758eed567db117e1
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Mar 25 19:19:49 2024 +0100
NIFI-12936 ListGCSBucket resets its tracking state after configuration
change
Signed-off-by: Pierre Villard <[email protected]>
This closes #8563.
---
.../nifi/processors/gcp/storage/ListGCSBucket.java | 63 +++-
.../processors/gcp/storage/ListGCSBucketTest.java | 409 ++++++++++++---------
2 files changed, 281 insertions(+), 191 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 627f66654d..be18311356 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -69,6 +69,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -274,40 +275,64 @@ public class ListGCSBucket extends AbstractGCSProcessor {
return relationships;
}
- // State tracking
+ private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES =
Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(
+ BUCKET,
+ PREFIX,
+ LISTING_STRATEGY
+ ))
+ );
+
+ // used by Tracking Timestamps tracking strategy
public static final String CURRENT_TIMESTAMP = "currentTimestamp";
public static final String CURRENT_KEY_PREFIX = "key-";
private volatile long currentTimestamp = 0L;
private final Set<String> currentKeys = Collections.synchronizedSet(new
HashSet<>());
- private volatile boolean justElectedPrimaryNode = false;
- private volatile boolean resetEntityTrackingState = false;
+ // used by Tracking Entities tracking strategy
private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
+ private volatile boolean justElectedPrimaryNode = false;
+ private volatile boolean resetTracking = false;
+
@OnPrimaryNodeStateChange
public void onPrimaryNodeChange(final PrimaryNodeState newState) {
justElectedPrimaryNode = (newState ==
PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (isConfigurationRestored() &&
TRACKING_RESET_PROPERTIES.contains(descriptor)) {
+ resetTracking = true;
+ }
+ }
+
@OnScheduled
- public void initListedEntityTracker(ProcessContext context) {
- final boolean isTrackingEntityStrategy =
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
- if (listedEntityTracker != null && (resetEntityTrackingState ||
!isTrackingEntityStrategy)) {
+ public void initTrackingStrategy(ProcessContext context) throws
IOException {
+ final String listingStrategy =
context.getProperty(LISTING_STRATEGY).getValue();
+ final boolean isTrackingTimestampsStrategy =
BY_TIMESTAMPS.getValue().equals(listingStrategy);
+ final boolean isTrackingEntityStrategy =
BY_ENTITIES.getValue().equals(listingStrategy);
+
+ if (resetTracking || !isTrackingTimestampsStrategy) {
+ context.getStateManager().clear(Scope.CLUSTER);
+ currentTimestamp = 0L;
+ currentKeys.clear();
+ }
+
+ if (listedEntityTracker != null && (resetTracking ||
!isTrackingEntityStrategy)) {
try {
listedEntityTracker.clearListedEntities();
+ listedEntityTracker = null;
} catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed
entities", e);
}
}
- resetEntityTrackingState = false;
- if (isTrackingEntityStrategy) {
- if (listedEntityTracker == null) {
- listedEntityTracker = createListedEntityTracker();
- }
- } else {
- listedEntityTracker = null;
+ if (isTrackingEntityStrategy && listedEntityTracker == null) {
+ listedEntityTracker = createListedEntityTracker();
}
+
+ resetTracking = false;
}
protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
@@ -1026,4 +1051,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
return count;
}
}
+
+ long getCurrentTimestamp() {
+ return currentTimestamp;
+ }
+
+ ListedEntityTracker<ListableBlob> getListedEntityTracker() {
+ return listedEntityTracker;
+ }
+
+ boolean isResetTracking() {
+ return resetTracking;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index e7999ccefc..b0b19ac17e 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -17,19 +17,24 @@
package org.apache.nifi.processors.gcp.storage;
import com.google.api.gax.paging.Page;
+import com.google.cloud.PageImpl;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
@@ -69,17 +74,20 @@ import static
org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_ATTR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit tests for {@link ListGCSBucket} which do not consume Google Cloud
resources.
*/
public class ListGCSBucketTest extends AbstractGCSTest {
+
private static final String PREFIX = "test-prefix";
private static final Boolean USE_GENERATIONS = true;
@@ -106,16 +114,29 @@ public class ListGCSBucketTest extends AbstractGCSTest {
private static final Long CREATE_TIME = 1234L;
private static final Long UPDATE_TIME = 4567L;
private final static Long GENERATION = 5L;
+ private static final long TIMESTAMP = 1234567890;
@Mock
Storage storage;
+ @Mock
+ Page<Blob> mockBlobPage;
+
@Captor
ArgumentCaptor<Storage.BlobListOption> argumentCaptor;
- @Override
- public ListGCSBucket getProcessor() {
- return new ListGCSBucket() {
+ private TestRunner runner;
+
+ private ListGCSBucket processor;
+
+ private MockStateManager mockStateManager;
+
+ @Mock
+ private DistributedMapCacheClient mockCache;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ processor = new ListGCSBucket() {
@Override
protected Storage getCloudService() {
return storage;
@@ -126,21 +147,25 @@ public class ListGCSBucketTest extends AbstractGCSTest {
return storage;
}
};
+
+ runner = buildNewRunner(processor);
+ runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
+ runner.assertValid();
+
+ mockStateManager = runner.getStateManager();
+ }
+
+ @Override
+ public ListGCSBucket getProcessor() {
+ return processor;
}
@Override
protected void addRequiredPropertiesToRunner(TestRunner runner) {
- runner.setProperty(ListGCSBucket.BUCKET, BUCKET);
}
@Test
public void testRestoreFreshState() throws Exception {
- reset(storage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
assertEquals(-1L,
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion(),
"Cluster StateMap should be fresh (version -1L)");
assertTrue(processor.getStateKeys().isEmpty());
@@ -150,17 +175,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals(0L, processor.getStateTimestamp());
assertTrue(processor.getStateKeys().isEmpty());
-
}
@Test
public void testRestorePreviousState() throws Exception {
- reset(storage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(4L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "test-key-0");
@@ -181,12 +199,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testPersistState() throws Exception {
- reset(storage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
assertEquals(-1L,
runner.getProcessContext().getStateManager().getState(Scope.CLUSTER).getVersion(),
"Cluster StateMap should be fresh (version -1L)"
@@ -208,13 +220,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
- public void testFailedPersistState() throws Exception {
- reset(storage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testFailedPersistState() {
runner.getStateManager().setFailOnStateSet(Scope.CLUSTER, true);
final Set<String> keys = new HashSet<>(Arrays.asList("test-key-0",
"test-key-1"));
@@ -234,52 +240,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
// We could do more specific things like check the contents of the
LogMessage,
// but that seems too nitpicky.
-
- }
-
- @Mock
- Page<Blob> mockBlobPage;
-
- private Blob buildMockBlob(String bucket, String key, long updateTime) {
- final Blob blob = mock(Blob.class);
- when(blob.getBucket()).thenReturn(bucket);
- when(blob.getName()).thenReturn(key);
- when(blob.getUpdateTime()).thenReturn(updateTime);
- return blob;
- }
-
- private Blob buildMockBlobWithoutBucket(String bucket, String key, long
updateTime) {
- final Blob blob = mock(Blob.class);
- when(blob.getName()).thenReturn(key);
- when(blob.getUpdateTime()).thenReturn(updateTime);
- return blob;
- }
-
- private void verifyConfigVerification(final TestRunner runner, final
ListGCSBucket processor, final int expectedCount) {
- final List<ConfigVerificationResult> verificationResults =
processor.verify(runner.getProcessContext(), runner.getLogger(),
Collections.emptyMap());
- assertEquals(3, verificationResults.size());
- final ConfigVerificationResult cloudServiceResult =
verificationResults.get(0);
- assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
cloudServiceResult.getOutcome());
-
- final ConfigVerificationResult iamPermissionsResult =
verificationResults.get(1);
- assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
iamPermissionsResult.getOutcome());
-
- final ConfigVerificationResult listingResult =
verificationResults.get(2);
- assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
listingResult.getOutcome());
-
- assertTrue(
-
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*",
expectedCount)),
- String.format("Expected %s blobs to be counted, but
explanation was: %s", expectedCount, listingResult.getExplanation()));
}
@Test
- public void testSuccessfulList() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testSuccessfulList() {
final Iterable<Blob> mockList = Arrays.asList(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
@@ -316,11 +280,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
- public void testNoTrackingListing() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
+ public void testNoTrackingListing() {
runner.setProperty(ListGCSBucket.LISTING_STRATEGY,
ListGCSBucket.NO_TRACKING);
runner.assertValid();
@@ -366,12 +326,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testOldValues() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Iterable<Blob> mockList = Collections.singletonList(
buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
);
@@ -394,16 +348,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("2",
runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
}
-
-
@Test
public void testEmptyList() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Iterable<Blob> mockList = Collections.emptyList();
when(mockBlobPage.getValues()).thenReturn(mockList);
@@ -423,12 +369,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndFilesComingInAlphabeticalOrder() throws
Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1");
@@ -467,12 +407,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws
Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -516,12 +450,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndNewFilesComingWithTheSameTimestamp()
throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -571,12 +499,6 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp()
throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
final Map<String, String> state = new LinkedHashMap<>();
state.put(ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L));
state.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2");
@@ -624,13 +546,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
- public void testAttributesSet() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testAttributesSet() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
when(blob.getSize()).thenReturn(SIZE);
when(blob.getCacheControl()).thenReturn(CACHE_CONTROL);
@@ -690,13 +606,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
- public void testAclOwnerUser() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testAclOwnerUser() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.User mockUser = mock(Acl.User.class);
when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
@@ -719,15 +629,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("user", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
-
@Test
- public void testAclOwnerGroup() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testAclOwnerGroup() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.Group mockGroup = mock(Acl.Group.class);
when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
@@ -750,16 +653,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("group", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
-
-
@Test
- public void testAclOwnerDomain() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testAclOwnerDomain() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.Domain mockDomain = mock(Acl.Domain.class);
when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
@@ -781,16 +676,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("domain", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
-
-
@Test
- public void testAclOwnerProject() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testAclOwnerProject() {
final Blob blob = buildMockBlob("test-bucket-1", "test-key-1", 2L);
final Acl.Project mockProject = mock(Acl.Project.class);
when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
@@ -813,15 +700,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals("project", flowFile.getAttribute(OWNER_TYPE_ATTR));
}
-
@Test
- public void testYieldOnBadStateRestore() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
- runner.assertValid();
-
+ public void testYieldOnBadStateRestore() {
final Iterable<Blob> mockList = Collections.emptyList();
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
@@ -833,12 +713,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
- public void testListOptionsPrefix() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
-
+ public void testListOptionsPrefix() {
runner.setProperty(ListGCSBucket.PREFIX, PREFIX);
runner.assertValid();
@@ -854,14 +729,8 @@ public class ListGCSBucketTest extends AbstractGCSTest {
assertEquals(Storage.BlobListOption.prefix(PREFIX),
argumentCaptor.getValue());
}
-
@Test
- public void testListOptionsVersions() throws Exception {
- reset(storage, mockBlobPage);
- final ListGCSBucket processor = getProcessor();
- final TestRunner runner = buildNewRunner(processor);
- addRequiredPropertiesToRunner(runner);
-
+ public void testListOptionsVersions() {
runner.setProperty(ListGCSBucket.USE_GENERATIONS,
String.valueOf(USE_GENERATIONS));
runner.assertValid();
@@ -877,4 +746,188 @@ public class ListGCSBucketTest extends AbstractGCSTest {
Storage.BlobListOption option = argumentCaptor.getValue();
assertEquals(Storage.BlobListOption.versions(true), option);
}
+
+ @Test
+ void testResetTimestampTrackingWhenBucketModified() throws Exception {
+ setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+ assertFalse(processor.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+ runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
+
+ assertTrue(processor.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, processor.getCurrentTimestamp());
+ mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP,
Scope.CLUSTER);
+
+ assertFalse(processor.isResetTracking());
+ }
+
+ @Test
+ void testResetTimestampTrackingWhenPrefixModified() throws Exception {
+ setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+ assertFalse(processor.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+ runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
+
+ assertTrue(processor.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, processor.getCurrentTimestamp());
+ mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP,
Scope.CLUSTER);
+
+ assertFalse(processor.isResetTracking());
+ }
+
+ @Test
+ void testResetTimestampTrackingWhenStrategyModified() throws Exception {
+ setUpResetTrackingTest(ListGCSBucket.BY_TIMESTAMPS);
+
+ assertFalse(processor.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TIMESTAMP, processor.getCurrentTimestamp());
+
+ runner.setProperty(ListGCSBucket.LISTING_STRATEGY,
ListGCSBucket.NO_TRACKING);
+
+ assertTrue(processor.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, processor.getCurrentTimestamp());
+ mockStateManager.assertStateNotSet(ListGCSBucket.CURRENT_TIMESTAMP,
Scope.CLUSTER);
+
+ assertFalse(processor.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenBucketModified() throws Exception {
+ setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+ assertFalse(processor.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(processor.getListedEntityTracker());
+
+ runner.setProperty(ListGCSBucket.BUCKET, "otherBucket");
+
+ assertTrue(processor.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(processor.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
processor.getIdentifier()), any());
+
+ assertFalse(processor.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenPrefixModified() throws Exception {
+ setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+ assertFalse(processor.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(processor.getListedEntityTracker());
+
+ runner.setProperty(ListGCSBucket.PREFIX, "prefix2");
+
+ assertTrue(processor.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(processor.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
processor.getIdentifier()), any());
+
+ assertFalse(processor.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenStrategyModified() throws Exception {
+ setUpResetTrackingTest(ListGCSBucket.BY_ENTITIES);
+
+ assertFalse(processor.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(processor.getListedEntityTracker());
+
+ runner.setProperty(ListGCSBucket.LISTING_STRATEGY,
ListGCSBucket.NO_TRACKING);
+
+ assertTrue(processor.isResetTracking());
+
+ runner.run();
+
+ assertNull(processor.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
processor.getIdentifier()), any());
+
+ assertFalse(processor.isResetTracking());
+ }
+
+ private void setUpResetTrackingTest(AllowableValue listingStrategy) throws
Exception {
+ runner.setProperty(ListGCSBucket.LISTING_STRATEGY, listingStrategy);
+ runner.setProperty(ListGCSBucket.PREFIX, "prefix1");
+
+ if (listingStrategy == ListGCSBucket.BY_TIMESTAMPS) {
+ Map<String, String> map = new HashMap<>();
+ map.put(ListGCSBucket.CURRENT_TIMESTAMP, Long.toString(TIMESTAMP));
+ map.put(ListGCSBucket.CURRENT_KEY_PREFIX + "0", "file");
+ mockStateManager.setState(map, Scope.CLUSTER);
+ } else if (listingStrategy == ListGCSBucket.BY_ENTITIES) {
+ String serviceId = "DistributedMapCacheClient";
+ when(mockCache.getIdentifier()).thenReturn(serviceId);
+ runner.addControllerService(serviceId, mockCache);
+ runner.enableControllerService(mockCache);
+ runner.setProperty(ListGCSBucket.TRACKING_STATE_CACHE, serviceId);
+ }
+
+ when(storage.list(anyString(),
any(Storage.BlobListOption.class))).thenReturn(new PageImpl<>(null, null,
null));
+ }
+
+ private Blob buildMockBlob(final String bucket, final String key, final
long updateTime) {
+ final Blob blob = mock(Blob.class);
+ when(blob.getBucket()).thenReturn(bucket);
+ when(blob.getName()).thenReturn(key);
+ when(blob.getUpdateTime()).thenReturn(updateTime);
+ return blob;
+ }
+
+ private Blob buildMockBlobWithoutBucket(final String bucket, final String
key, final long updateTime) {
+ final Blob blob = mock(Blob.class);
+ when(blob.getName()).thenReturn(key);
+ when(blob.getUpdateTime()).thenReturn(updateTime);
+ return blob;
+ }
+
+ private void verifyConfigVerification(final TestRunner runner, final
ListGCSBucket processor, final int expectedCount) {
+ final List<ConfigVerificationResult> verificationResults =
processor.verify(runner.getProcessContext(), runner.getLogger(),
Collections.emptyMap());
+ assertEquals(3, verificationResults.size());
+ final ConfigVerificationResult cloudServiceResult =
verificationResults.get(0);
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
cloudServiceResult.getOutcome());
+
+ final ConfigVerificationResult iamPermissionsResult =
verificationResults.get(1);
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
iamPermissionsResult.getOutcome());
+
+ final ConfigVerificationResult listingResult =
verificationResults.get(2);
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
listingResult.getOutcome());
+
+ assertTrue(
+
listingResult.getExplanation().matches(String.format(".*finding %s blobs.*",
expectedCount)),
+ String.format("Expected %s blobs to be counted, but
explanation was: %s", expectedCount, listingResult.getExplanation()));
+ }
}
\ No newline at end of file