This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 21a27c8 NIFI-6636: Fixed ListGCSBucket file duplication error
21a27c8 is described below
commit 21a27c8bb00c3f41836c938c3bb59ff428a27ca4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Fri Sep 6 13:49:39 2019 +0200
NIFI-6636: Fixed ListGCSBucket file duplication error
ListGCSBucket duplicated files if they arrived not in alphabetical order.
The set storing the name of the latest blob (which was loaded with the
highest
timestamp during the previous run of the processor) was cleared too early.
Also changed the state persisting logic: it is now saved only once at the
end
of onTrigger() (similar to ListS3). Some inconsistent state (only blob names
without the timestamp) was also saved earlier.
This closes #3702.
Signed-off-by: Mark Payne <[email protected]>
---
.../nifi/processors/gcp/storage/ListGCSBucket.java | 41 ++-
.../processors/gcp/storage/ListGCSBucketTest.java | 386 ++++++++++++++++++---
2 files changed, 361 insertions(+), 66 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index 01293cf..f4cdb27 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -259,13 +259,16 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
final Storage storage = getCloudService();
- int listCount = 0;
+
long maxTimestamp = 0L;
+ Set<String> maxKeys = new HashSet<>();
- Page<Blob> blobPages = storage.list(bucket, listOptions.toArray(new
Storage.BlobListOption[listOptions.size()]));
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new
Storage.BlobListOption[listOptions.size()]));
do {
- for (Blob blob : blobPages.getValues()) {
+ int listCount = 0;
+
+ for (Blob blob : blobPage.getValues()) {
long lastModified = blob.getUpdateTime();
if (lastModified < currentTimestamp
|| lastModified == currentTimestamp &&
currentKeys.contains(blob.getName())) {
@@ -381,40 +384,36 @@ public class ListGCSBucket extends AbstractGCSProcessor {
// Update state
if (lastModified > maxTimestamp) {
maxTimestamp = lastModified;
- currentKeys.clear();
+ maxKeys.clear();
}
if (lastModified == maxTimestamp) {
- currentKeys.add(blob.getName());
+ maxKeys.add(blob.getName());
}
listCount++;
}
- blobPages = blobPages.getNextPage();
commit(context, session, listCount);
- listCount = 0;
- } while (blobPages != null);
-
- currentTimestamp = maxTimestamp;
- final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- getLogger().info("Successfully listed GCS bucket {} in {} millis", new
Object[]{bucket, listMillis});
+ blobPage = blobPage.getNextPage();
+ } while (blobPage != null);
- if (!commit(context, session, listCount)) {
- if (currentTimestamp > 0) {
- persistState(context);
- }
+ if (maxTimestamp != 0) {
+ currentTimestamp = maxTimestamp;
+ currentKeys = maxKeys;
+ persistState(context);
+ } else {
getLogger().debug("No new objects in GCS bucket {} to list.
Yielding.", new Object[]{bucket});
context.yield();
}
+
+ final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully listed GCS bucket {} in {} millis", new
Object[]{bucket, listMillis});
}
- private boolean commit(final ProcessContext context, final ProcessSession
session, int listCount) {
- boolean willCommit = listCount > 0;
- if (willCommit) {
+ private void commit(final ProcessContext context, final ProcessSession
session, int listCount) {
+ if (listCount > 0) {
getLogger().info("Successfully listed {} new files from GCS;
routing to success", new Object[] {listCount});
session.commit();
- persistState(context);
}
- return willCommit;
}
}
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index e17cf4b..1a925d0 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -256,7 +256,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Mock
- Page<Blob> mockBlobPages;
+ Page<Blob> mockBlobPage;
private Blob buildMockBlob(String bucket, String key, long updateTime) {
final Blob blob = mock(Blob.class);
@@ -268,7 +268,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testSuccessfulList() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -279,13 +279,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
buildMockBlob("blob-bucket-2", "blob-key-2", 3L)
);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -343,7 +343,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testOldValues() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -353,13 +353,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
buildMockBlob("blob-bucket-1", "blob-key-1", 2L)
);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.enqueue("test2");
@@ -384,7 +384,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testEmptyList() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -392,13 +392,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -413,8 +413,304 @@ public class ListGCSBucketTest extends AbstractGCSTest {
}
@Test
+ public void testListWithStateAndFilesComingInAlphabeticalOrder() throws
Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-1"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 2L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+
+ final List<MockFlowFile> successes =
runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-2",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-2",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 2L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-2"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
+ public void testListWithStateAndFilesComingNotInAlphabeticalOrder() throws
Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 1L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+
+ final List<MockFlowFile> successes =
runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-1",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-1",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 2L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-1"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
+ public void testListWithStateAndNewFilesComingWithTheSameTimestamp()
throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 2L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
+ buildMockBlob("blob-bucket-3", "blob-key-3", 2L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> successes =
runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-1",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-1",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ flowFile = successes.get(1);
+ assertEquals(
+ "blob-bucket-3",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-3",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "2",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 2L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-1",
+ "blob-key-3"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
+ public void testListWithStateAndNewFilesComingWithTheCurrentTimestamp()
throws Exception {
+ reset(storage, mockBlobPage);
+ final ListGCSBucket processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
+ addRequiredPropertiesToRunner(runner);
+ runner.assertValid();
+
+ final Map<String, String> state = ImmutableMap.of(
+ ListGCSBucket.CURRENT_TIMESTAMP, String.valueOf(1L),
+ ListGCSBucket.CURRENT_KEY_PREFIX + "0", "blob-key-2"
+ );
+
+ runner.getStateManager().setState(state, Scope.CLUSTER);
+
+ final Iterable<Blob> mockList = ImmutableList.of(
+ buildMockBlob("blob-bucket-1", "blob-key-1", 1L),
+ buildMockBlob("blob-bucket-2", "blob-key-2", 1L),
+ buildMockBlob("blob-bucket-3", "blob-key-3", 1L)
+ );
+
+ when(mockBlobPage.getValues())
+ .thenReturn(mockList);
+
+ when(mockBlobPage.getNextPage()).thenReturn(null);
+
+ when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
+ .thenReturn(mockBlobPage);
+
+ runner.enqueue("test");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
+ runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+
+ final List<MockFlowFile> successes =
runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
+
+ MockFlowFile flowFile = successes.get(0);
+ assertEquals(
+ "blob-bucket-1",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-1",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "1",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ flowFile = successes.get(1);
+ assertEquals(
+ "blob-bucket-3",
+ flowFile.getAttribute(BUCKET_ATTR)
+ );
+
+ assertEquals(
+ "blob-key-3",
+ flowFile.getAttribute(KEY_ATTR)
+ );
+
+ assertEquals(
+ "1",
+ flowFile.getAttribute(UPDATE_TIME_ATTR)
+ );
+
+ assertEquals(
+ 1L,
+ processor.currentTimestamp
+ );
+
+ assertEquals(
+ ImmutableSet.of(
+ "blob-key-1",
+ "blob-key-3"
+ ),
+ processor.currentKeys
+ );
+ }
+
+ @Test
public void testAttributesSet() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -447,13 +743,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -555,7 +851,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerUser() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -568,13 +864,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -598,7 +894,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerGroup() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -611,13 +907,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -642,7 +938,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerDomain() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -655,13 +951,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -686,7 +982,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testAclOwnerProject() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -699,13 +995,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of(blob);
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -729,7 +1025,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testYieldOnBadStateRestore() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -737,13 +1033,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
runner.enqueue("test");
@@ -758,7 +1054,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListOptionsPrefix() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -772,13 +1068,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), argumentCaptor.capture()))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();
@@ -793,7 +1089,7 @@ public class ListGCSBucketTest extends AbstractGCSTest {
@Test
public void testListOptionsVersions() throws Exception {
- reset(storage, mockBlobPages);
+ reset(storage, mockBlobPage);
final ListGCSBucket processor = getProcessor();
final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
@@ -806,13 +1102,13 @@ public class ListGCSBucketTest extends AbstractGCSTest {
final Iterable<Blob> mockList = ImmutableList.of();
- when(mockBlobPages.getValues())
+ when(mockBlobPage.getValues())
.thenReturn(mockList);
- when(mockBlobPages.getNextPage()).thenReturn(null);
+ when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), argumentCaptor.capture()))
- .thenReturn(mockBlobPages);
+ .thenReturn(mockBlobPage);
runner.enqueue("test");
runner.run();