This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 47c41ba8d7 NIFI-12732 ListS3 resets its tracking state after
configuration change
47c41ba8d7 is described below
commit 47c41ba8d72d3f3494d42f6d4c32da53fc009a7b
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu Feb 1 13:16:42 2024 +0100
NIFI-12732 ListS3 resets its tracking state after configuration change
This closes #8360.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../org/apache/nifi/processors/aws/s3/ListS3.java | 74 ++++--
.../apache/nifi/processors/aws/s3/TestListS3.java | 296 ++++++++++++++++++---
2 files changed, 314 insertions(+), 56 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 431778f1ef..cd0de3cb2f 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -326,15 +326,27 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
public static final Set<Relationship> relationships =
Collections.singleton(REL_SUCCESS);
+ private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES =
Collections.unmodifiableSet(
+ new HashSet<>(Arrays.asList(
+ BUCKET,
+ REGION,
+ PREFIX,
+ LISTING_STRATEGY
+ ))
+ );
+
public static final String CURRENT_TIMESTAMP = "currentTimestamp";
public static final String CURRENT_KEY_PREFIX = "key-";
- // State tracking
- private final AtomicReference<ListingSnapshot> listing = new
AtomicReference<>(new ListingSnapshot(0L, Collections.emptySet()));
+ // used by Tracking Timestamps tracking strategy
+ private final AtomicReference<ListingSnapshot> listing = new
AtomicReference<>(ListingSnapshot.empty());
- private volatile boolean justElectedPrimaryNode = false;
- private volatile boolean resetEntityTrackingState = false;
+ // used by Tracking Entities tracking strategy
private volatile
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>>
listedEntityTracker;
+
+ private volatile boolean justElectedPrimaryNode = false;
+ private volatile boolean resetTracking = false;
+
private volatile Long minObjectAgeMilliseconds;
private volatile Long maxObjectAgeMilliseconds;
@@ -343,26 +355,42 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
justElectedPrimaryNode = (newState ==
PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (isConfigurationRestored() &&
TRACKING_RESET_PROPERTIES.contains(descriptor)) {
+ resetTracking = true;
+ }
+ }
+
@OnScheduled
- public void initListedEntityTracker(ProcessContext context) {
- final boolean isTrackingEntityStrategy =
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
- if (listedEntityTracker != null && (resetEntityTrackingState ||
!isTrackingEntityStrategy)) {
+ public void initTrackingStrategy(ProcessContext context) throws
IOException {
+ final String listingStrategy =
context.getProperty(LISTING_STRATEGY).getValue();
+ final boolean isTrackingTimestampsStrategy =
BY_TIMESTAMPS.getValue().equals(listingStrategy);
+ final boolean isTrackingEntityStrategy =
BY_ENTITIES.getValue().equals(listingStrategy);
+
+ if (resetTracking || !isTrackingTimestampsStrategy) {
+ context.getStateManager().clear(Scope.CLUSTER);
+ listing.set(ListingSnapshot.empty());
+ }
+
+ if (listedEntityTracker != null && (resetTracking ||
!isTrackingEntityStrategy)) {
try {
listedEntityTracker.clearListedEntities();
+ listedEntityTracker = null;
} catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed
entities", e);
}
}
- resetEntityTrackingState = false;
- if (isTrackingEntityStrategy) {
- if (listedEntityTracker == null) {
- listedEntityTracker = createListedEntityTracker();
- }
- } else {
- listedEntityTracker = null;
+ if (isTrackingEntityStrategy && listedEntityTracker == null) {
+ listedEntityTracker = createListedEntityTracker();
}
+ resetTracking = false;
+ }
+
+ @OnScheduled
+ public void initObjectAgeThresholds(ProcessContext context) {
minObjectAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
maxObjectAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
}
@@ -1175,7 +1203,7 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
}
}
- private static class ListingSnapshot {
+ static class ListingSnapshot {
private final long timestamp;
private final Set<String> keys;
@@ -1191,6 +1219,10 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
public Set<String> getKeys() {
return keys;
}
+
+ public static ListingSnapshot empty() {
+ return new ListingSnapshot(0L, Collections.emptySet());
+ }
}
@Override
@@ -1270,4 +1302,16 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
versionSummary.setIsLatest(true);
return versionSummary;
}
+
+ ListingSnapshot getListingSnapshot() {
+ return listing.get();
+ }
+
+ ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>>
getListedEntityTracker() {
+ return listedEntityTracker;
+ }
+
+ boolean isResetTracking() {
+ return resetTracking;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index aede9b2a55..b3228fd344 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.aws.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@@ -30,8 +31,10 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.reporting.InitializationException;
@@ -57,24 +60,38 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestListS3 {
- private TestRunner runner = null;
- private AmazonS3Client mockS3Client = null;
+ private static final long TEST_TIMESTAMP = 1234567890;
+
+ private TestRunner runner;
+ private ListS3 listS3;
+ private AmazonS3Client mockS3Client;
+ private MockStateManager mockStateManager;
+ private DistributedMapCacheClient mockCache;
@BeforeEach
public void setUp() {
- mockS3Client = Mockito.mock(AmazonS3Client.class);
- final ListS3 mockListS3 = new ListS3() {
+ mockS3Client = mock(AmazonS3Client.class);
+ listS3 = new ListS3() {
@Override
protected AmazonS3Client createClient(ProcessContext context,
AWSCredentials credentials, ClientConfiguration config) {
return mockS3Client;
}
};
- runner = TestRunners.newTestRunner(mockListS3);
+ runner = TestRunners.newTestRunner(listS3);
+ mockStateManager = runner.getStateManager();
+ mockCache = mock(DistributedMapCacheClient.class);
}
@@ -100,16 +117,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -157,16 +174,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
@@ -204,16 +221,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertTrue(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -260,16 +277,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest =
ArgumentCaptor.forClass(ListObjectsV2Request.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -307,16 +324,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest =
ArgumentCaptor.forClass(ListObjectsV2Request.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertTrue(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -350,15 +367,15 @@ public class TestListS3 {
versionSummary2.setVersionId("2");
versionSummary2.setLastModified(lastModified);
versionListing.getVersionSummaries().add(versionSummary2);
-
Mockito.when(mockS3Client.listVersions(Mockito.any(ListVersionsRequest.class))).thenReturn(versionListing);
+
when(mockS3Client.listVersions(any(ListVersionsRequest.class))).thenReturn(versionListing);
runner.run();
ArgumentCaptor<ListVersionsRequest> captureRequest =
ArgumentCaptor.forClass(ListVersionsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listVersions(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listVersions(captureRequest.capture());
ListVersionsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listObjects(Mockito.any(ListObjectsRequest.class));
+ verify(mockS3Client,
Mockito.never()).listObjects(any(ListObjectsRequest.class));
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -396,15 +413,15 @@ public class TestListS3 {
objectSummary1.setKey("test-key");
objectSummary1.setLastModified(objectLastModified);
objectListing.getObjectSummaries().add(objectSummary1);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0);
}
@@ -435,7 +452,7 @@ public class TestListS3 {
objectSummary3.setKey("now");
objectSummary3.setLastModified(lastModifiedNow);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
Map<String,String> stateMap = new HashMap<>();
String previousTimestamp =
String.valueOf(lastModifiedMinus3Hour.getTime());
@@ -446,10 +463,10 @@ public class TestListS3 {
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -485,7 +502,7 @@ public class TestListS3 {
objectSummary3.setKey("now");
objectSummary3.setLastModified(lastModifiedNow);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
Map<String,String> stateMap = new HashMap<>();
String previousTimestamp =
String.valueOf(lastModifiedMinus3Hour.getTime());
@@ -494,10 +511,10 @@ public class TestListS3 {
runner.getStateManager().setState(stateMap, Scope.CLUSTER);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -523,17 +540,17 @@ public class TestListS3 {
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<GetObjectTaggingRequest> captureRequest =
ArgumentCaptor.forClass(GetObjectTaggingRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).getObjectTagging(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).getObjectTagging(captureRequest.capture());
GetObjectTaggingRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("a", request.getKey());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
}
@Test
@@ -550,18 +567,18 @@ public class TestListS3 {
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<GetObjectMetadataRequest> captureRequest =
ArgumentCaptor.forClass(GetObjectMetadataRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
GetObjectMetadataRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("a", request.getKey());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
}
@Test
@@ -587,16 +604,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -622,4 +639,201 @@ public class TestListS3 {
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP,
null, Scope.CLUSTER);
}
+
+ @Test
+ void testResetTimestampTrackingWhenBucketModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.BUCKET, "otherBucket");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0",
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+ @Test
+ void testResetTimestampTrackingWhenRegionModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0",
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetTimestampTrackingWhenPrefixModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.PREFIX, "prefix2");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0",
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetTimestampTrackingWhenStrategyModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateNotSet(ListS3.CURRENT_TIMESTAMP,
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenBucketModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.BUCKET, "otherBucket");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenRegionModified() throws Exception {
+
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenPrefixModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.PREFIX, "prefix2");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenStrategyModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ private void setUpResetTrackingTest(AllowableValue listingStrategy) throws
Exception {
+ runner.setProperty(ListS3.BUCKET, "test-bucket");
+ runner.setProperty(ListS3.LISTING_STRATEGY, listingStrategy);
+ runner.setProperty(ListS3.PREFIX, "prefix1");
+
+ if (listingStrategy == ListS3.BY_TIMESTAMPS) {
+ Map<String, String> map = new HashMap<>();
+ map.put(ListS3.CURRENT_TIMESTAMP, Long.toString(TEST_TIMESTAMP));
+ map.put(ListS3.CURRENT_KEY_PREFIX + "0", "file");
+ mockStateManager.setState(map, Scope.CLUSTER);
+ } else if (listingStrategy == ListS3.BY_ENTITIES) {
+ String serviceId = "DistributedMapCacheClient";
+ when(mockCache.getIdentifier()).thenReturn(serviceId);
+ runner.addControllerService(serviceId, mockCache);
+ runner.enableControllerService(mockCache);
+ runner.setProperty(ListS3.TRACKING_STATE_CACHE, serviceId);
+ }
+
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(new
ObjectListing());
+ }
}