This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8f63e928a9 NIFI-12732 ListS3 resets its tracking state after
configuration change
8f63e928a9 is described below
commit 8f63e928a9d1ed5c3bf90ebf9766b5fcf4655e46
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu Feb 1 13:16:42 2024 +0100
NIFI-12732 ListS3 resets its tracking state after configuration change
This closes #8348.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../org/apache/nifi/processors/aws/s3/ListS3.java | 72 +++--
.../apache/nifi/processors/aws/s3/TestListS3.java | 293 ++++++++++++++++++---
2 files changed, 309 insertions(+), 56 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 94a1c9d127..b1b085ca38 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -318,15 +318,25 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
public static final Set<Relationship> relationships =
Collections.singleton(REL_SUCCESS);
+ private static final Set<PropertyDescriptor> TRACKING_RESET_PROPERTIES =
Set.of(
+ BUCKET_WITHOUT_DEFAULT_VALUE,
+ REGION,
+ PREFIX,
+ LISTING_STRATEGY
+ );
+
public static final String CURRENT_TIMESTAMP = "currentTimestamp";
public static final String CURRENT_KEY_PREFIX = "key-";
- // State tracking
- private final AtomicReference<ListingSnapshot> listing = new
AtomicReference<>(new ListingSnapshot(0L, Collections.emptySet()));
+ // used by Tracking Timestamps tracking strategy
+ private final AtomicReference<ListingSnapshot> listing = new
AtomicReference<>(ListingSnapshot.empty());
- private volatile boolean justElectedPrimaryNode = false;
- private volatile boolean resetEntityTrackingState = false;
+ // used by Tracking Entities tracking strategy
private volatile
ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>>
listedEntityTracker;
+
+ private volatile boolean justElectedPrimaryNode = false;
+ private volatile boolean resetTracking = false;
+
private volatile Long minObjectAgeMilliseconds;
private volatile Long maxObjectAgeMilliseconds;
@@ -335,26 +345,42 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
justElectedPrimaryNode = (newState ==
PrimaryNodeState.ELECTED_PRIMARY_NODE);
}
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
+ if (isConfigurationRestored() &&
TRACKING_RESET_PROPERTIES.contains(descriptor)) {
+ resetTracking = true;
+ }
+ }
+
@OnScheduled
- public void initListedEntityTracker(ProcessContext context) {
- final boolean isTrackingEntityStrategy =
BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
- if (listedEntityTracker != null && (resetEntityTrackingState ||
!isTrackingEntityStrategy)) {
+ public void initTrackingStrategy(ProcessContext context) throws
IOException {
+ final String listingStrategy =
context.getProperty(LISTING_STRATEGY).getValue();
+ final boolean isTrackingTimestampsStrategy =
BY_TIMESTAMPS.getValue().equals(listingStrategy);
+ final boolean isTrackingEntityStrategy =
BY_ENTITIES.getValue().equals(listingStrategy);
+
+ if (resetTracking || !isTrackingTimestampsStrategy) {
+ context.getStateManager().clear(Scope.CLUSTER);
+ listing.set(ListingSnapshot.empty());
+ }
+
+ if (listedEntityTracker != null && (resetTracking ||
!isTrackingEntityStrategy)) {
try {
listedEntityTracker.clearListedEntities();
+ listedEntityTracker = null;
} catch (IOException e) {
throw new RuntimeException("Failed to reset previously listed
entities", e);
}
}
- resetEntityTrackingState = false;
- if (isTrackingEntityStrategy) {
- if (listedEntityTracker == null) {
- listedEntityTracker = createListedEntityTracker();
- }
- } else {
- listedEntityTracker = null;
+ if (isTrackingEntityStrategy && listedEntityTracker == null) {
+ listedEntityTracker = createListedEntityTracker();
}
+ resetTracking = false;
+ }
+
+ @OnScheduled
+ public void initObjectAgeThresholds(ProcessContext context) {
minObjectAgeMilliseconds =
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
maxObjectAgeMilliseconds = context.getProperty(MAX_AGE) != null ?
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS) : null;
}
@@ -1167,7 +1193,7 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
}
}
- private static class ListingSnapshot {
+ static class ListingSnapshot {
private final long timestamp;
private final Set<String> keys;
@@ -1183,6 +1209,10 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
public Set<String> getKeys() {
return keys;
}
+
+ public static ListingSnapshot empty() {
+ return new ListingSnapshot(0L, Collections.emptySet());
+ }
}
@Override
@@ -1262,4 +1292,16 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
versionSummary.setIsLatest(true);
return versionSummary;
}
+
+ ListingSnapshot getListingSnapshot() {
+ return listing.get();
+ }
+
+ ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>>
getListedEntityTracker() {
+ return listedEntityTracker;
+ }
+
+ boolean isResetTracking() {
+ return resetTracking;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 1a8259a268..e5655ebdf0 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -20,6 +20,7 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
@@ -32,8 +33,10 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
@@ -60,25 +63,39 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestListS3 {
- private TestRunner runner = null;
- private AmazonS3Client mockS3Client = null;
+ private static final long TEST_TIMESTAMP = 1234567890;
+
+ private TestRunner runner;
+ private ListS3 listS3;
+ private AmazonS3Client mockS3Client;
+ private MockStateManager mockStateManager;
+ private DistributedMapCacheClient mockCache;
@BeforeEach
public void setUp() {
- mockS3Client = Mockito.mock(AmazonS3Client.class);
- final ListS3 mockListS3 = new ListS3() {
+ mockS3Client = mock(AmazonS3Client.class);
+ listS3 = new ListS3() {
@Override
protected AmazonS3Client createClient(final ProcessContext
context, final AWSCredentialsProvider credentialsProvider, final Region region,
final ClientConfiguration config,
final
AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
return mockS3Client;
}
};
- runner = TestRunners.newTestRunner(mockListS3);
+ runner = TestRunners.newTestRunner(listS3);
+ mockStateManager = runner.getStateManager();
+ mockCache = mock(DistributedMapCacheClient.class);
AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
}
@@ -105,16 +122,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -162,16 +179,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
@@ -209,16 +226,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertTrue(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -265,16 +282,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest =
ArgumentCaptor.forClass(ListObjectsV2Request.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -312,16 +329,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjectsV2(Mockito.any(ListObjectsV2Request.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsV2Request> captureRequest =
ArgumentCaptor.forClass(ListObjectsV2Request.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjectsV2(captureRequest.capture());
ListObjectsV2Request request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertTrue(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -355,15 +372,15 @@ public class TestListS3 {
versionSummary2.setVersionId("2");
versionSummary2.setLastModified(lastModified);
versionListing.getVersionSummaries().add(versionSummary2);
-
Mockito.when(mockS3Client.listVersions(Mockito.any(ListVersionsRequest.class))).thenReturn(versionListing);
+
when(mockS3Client.listVersions(any(ListVersionsRequest.class))).thenReturn(versionListing);
runner.run();
ArgumentCaptor<ListVersionsRequest> captureRequest =
ArgumentCaptor.forClass(ListVersionsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listVersions(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listVersions(captureRequest.capture());
ListVersionsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listObjects(Mockito.any(ListObjectsRequest.class));
+ verify(mockS3Client,
Mockito.never()).listObjects(any(ListObjectsRequest.class));
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -401,15 +418,15 @@ public class TestListS3 {
objectSummary1.setKey("test-key");
objectSummary1.setLastModified(objectLastModified);
objectListing.getObjectSummaries().add(objectSummary1);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 0);
}
@@ -440,7 +457,7 @@ public class TestListS3 {
objectSummary3.setKey("now");
objectSummary3.setLastModified(lastModifiedNow);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
Map<String,String> stateMap = new HashMap<>();
String previousTimestamp =
String.valueOf(lastModifiedMinus3Hour.getTime());
@@ -451,10 +468,10 @@ public class TestListS3 {
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -490,7 +507,7 @@ public class TestListS3 {
objectSummary3.setKey("now");
objectSummary3.setLastModified(lastModifiedNow);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
Map<String,String> stateMap = new HashMap<>();
String previousTimestamp =
String.valueOf(lastModifiedMinus3Hour.getTime());
@@ -499,10 +516,10 @@ public class TestListS3 {
runner.getStateManager().setState(stateMap, Scope.CLUSTER);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -528,17 +545,17 @@ public class TestListS3 {
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<GetObjectTaggingRequest> captureRequest =
ArgumentCaptor.forClass(GetObjectTaggingRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).getObjectTagging(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).getObjectTagging(captureRequest.capture());
GetObjectTaggingRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("a", request.getKey());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
}
@Test
@@ -555,18 +572,18 @@ public class TestListS3 {
objectSummary1.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary1);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<GetObjectMetadataRequest> captureRequest =
ArgumentCaptor.forClass(GetObjectMetadataRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
GetObjectMetadataRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertEquals("a", request.getKey());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
}
@Test
@@ -592,16 +609,16 @@ public class TestListS3 {
objectSummary3.setKey("d/e");
objectSummary3.setLastModified(lastModified);
objectListing.getObjectSummaries().add(objectSummary3);
-
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(objectListing);
runner.run();
ArgumentCaptor<ListObjectsRequest> captureRequest =
ArgumentCaptor.forClass(ListObjectsRequest.class);
- Mockito.verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
+ verify(mockS3Client,
Mockito.times(1)).listObjects(captureRequest.capture());
ListObjectsRequest request = captureRequest.getValue();
assertEquals("test-bucket", request.getBucketName());
assertFalse(request.isRequesterPays());
- Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ verify(mockS3Client, Mockito.never()).listVersions(any());
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
@@ -627,4 +644,198 @@ public class TestListS3 {
runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 3);
runner.getStateManager().assertStateEquals(ListS3.CURRENT_TIMESTAMP,
null, Scope.CLUSTER);
}
+
+ @Test
+ void testResetTimestampTrackingWhenBucketModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "otherBucket");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0",
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+ @Test
+ void testResetTimestampTrackingWhenRegionModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0",
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetTimestampTrackingWhenPrefixModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.PREFIX, "prefix2");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateEquals(ListS3.CURRENT_TIMESTAMP, "0",
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetTimestampTrackingWhenStrategyModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_TIMESTAMPS);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(TEST_TIMESTAMP,
listS3.getListingSnapshot().getTimestamp());
+
+ runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertEquals(0, listS3.getListingSnapshot().getTimestamp());
+ mockStateManager.assertStateNotSet(ListS3.CURRENT_TIMESTAMP,
Scope.CLUSTER);
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenBucketModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "otherBucket");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenRegionModified() throws Exception {
+
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.REGION, Regions.EU_CENTRAL_1.getName());
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenPrefixModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.PREFIX, "prefix2");
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ @Test
+ void testResetEntityTrackingWhenStrategyModified() throws Exception {
+ setUpResetTrackingTest(ListS3.BY_ENTITIES);
+
+ assertFalse(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNotNull(listS3.getListedEntityTracker());
+
+ runner.setProperty(ListS3.LISTING_STRATEGY, ListS3.NO_TRACKING);
+
+ assertTrue(listS3.isResetTracking());
+
+ runner.run();
+
+ assertNull(listS3.getListedEntityTracker());
+ verify(mockCache).remove(eq("ListedEntities::" +
listS3.getIdentifier()), any());
+
+ assertFalse(listS3.isResetTracking());
+ }
+
+ private void setUpResetTrackingTest(AllowableValue listingStrategy) throws
Exception {
+ runner.setProperty(ListS3.BUCKET_WITHOUT_DEFAULT_VALUE, "test-bucket");
+ runner.setProperty(ListS3.LISTING_STRATEGY, listingStrategy);
+ runner.setProperty(ListS3.PREFIX, "prefix1");
+
+ if (listingStrategy == ListS3.BY_TIMESTAMPS) {
+ mockStateManager.setState(Map.of(ListS3.CURRENT_TIMESTAMP,
Long.toString(TEST_TIMESTAMP), ListS3.CURRENT_KEY_PREFIX + "0", "file"),
Scope.CLUSTER);
+ } else if (listingStrategy == ListS3.BY_ENTITIES) {
+ String serviceId = "DistributedMapCacheClient";
+ when(mockCache.getIdentifier()).thenReturn(serviceId);
+ runner.addControllerService(serviceId, mockCache);
+ runner.enableControllerService(mockCache);
+ runner.setProperty(ListS3.TRACKING_STATE_CACHE, serviceId);
+ }
+
+
when(mockS3Client.listObjects(any(ListObjectsRequest.class))).thenReturn(new
ObjectListing());
+ }
}