Repository: beam Updated Branches: refs/heads/master a9bcdcedf -> 5fe11a2fc
[BEAM-59] GcsUtil: refactor and expose getObject() and listObjects() APIs. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51a8c37a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51a8c37a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51a8c37a Branch: refs/heads/master Commit: 51a8c37aa0cdf002629787c342b3d21a20d4404c Parents: a9bcdce Author: Pei He <[email protected]> Authored: Fri Feb 10 14:40:55 2017 -0800 Committer: Pei He <[email protected]> Committed: Fri Feb 10 21:35:32 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/util/GcsUtil.java | 124 +++++++++---------- .../org/apache/beam/sdk/util/GcsUtilTest.java | 11 +- 2 files changed, 67 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/51a8c37a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 5e83584..44c49bc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.client.googleapis.batch.BatchRequest; import com.google.api.client.googleapis.batch.json.JsonBatchCallback; @@ -198,26 +197,14 @@ public class GcsUtil { String prefix = null; if (!m.matches()) { // Not a glob. - Storage.Objects.Get getObject = storageClient.objects().get( - gcsPattern.getBucket(), gcsPattern.getObject()); try { - // Use a get request to fetch the metadata of the object, - // the request has strong global consistency. - ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getObject), - BACKOFF_FACTORY.backoff(), - RetryDeterminer.SOCKET_ERRORS, - IOException.class); + // Use a get request to fetch the metadata of the object, and ignore the return value. + // The request has strong global consistency. + getObject(gcsPattern); return ImmutableList.of(gcsPattern); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { - // If the path was not found, return an empty list. - return ImmutableList.of(); - } - throw new IOException("Unable to match files for pattern " + gcsPattern, e); + } catch (FileNotFoundException e) { + // If the path was not found, return an empty list. + return ImmutableList.of(); } } else { // Part before the first wildcard character. @@ -228,32 +215,10 @@ public class GcsUtil { LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), prefix, p.toString()); - // List all objects that start with the prefix (including objects in sub-directories). - Storage.Objects.List listObject = storageClient.objects().list(gcsPattern.getBucket()); - listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); - listObject.setPrefix(prefix); - String pageToken = null; List<GcsPath> results = new LinkedList<>(); do { - if (pageToken != null) { - listObject.setPageToken(pageToken); - } - - Objects objects; - try { - objects = ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(listObject), - BACKOFF_FACTORY.backoff(), - RetryDeterminer.SOCKET_ERRORS, - IOException.class); - } catch (Exception e) { - throw new IOException("Unable to match files in bucket " + gcsPattern.getBucket() - + ", prefix " + prefix + " against pattern " + p.toString(), e); - } - //Objects objects = listObject.execute(); - checkNotNull(objects); - + Objects objects = listObjects(gcsPattern.getBucket(), prefix, pageToken); if (objects.getItems() == null) { break; } @@ -267,7 +232,6 @@ public class GcsUtil { results.add(GcsPath.fromObject(o)); } } - pageToken = objects.getNextPageToken(); } while (pageToken != null); @@ -285,33 +249,64 @@ public class GcsUtil { * if the resource does not exist. */ public long fileSize(GcsPath path) throws IOException { - return fileSize( - path, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT); + return getObject(path).getSize().longValue(); } /** - * Returns the file size from GCS or throws {@link FileNotFoundException} - * if the resource does not exist. + * Returns the {@link StorageObject} for the given {@link GcsPath}. */ + public StorageObject getObject(GcsPath gcsPath) throws IOException { + return getObject(gcsPath, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + } + @VisibleForTesting - long fileSize(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { + StorageObject getObject(GcsPath gcsPath, BackOff backoff, Sleeper sleeper) throws IOException { Storage.Objects.Get getObject = - storageClient.objects().get(path.getBucket(), path.getObject()); + storageClient.objects().get(gcsPath.getBucket(), gcsPath.getObject()); try { - StorageObject object = ResilientOperation.retry( + return ResilientOperation.retry( ResilientOperation.getGoogleRequestCallable(getObject), backoff, RetryDeterminer.SOCKET_ERRORS, IOException.class, sleeper); - return object.getSize().longValue(); - } catch (Exception e) { + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { - throw new FileNotFoundException(path.toString()); + throw new FileNotFoundException(gcsPath.toString()); } - throw new IOException("Unable to get file size", e); + throw new IOException( + String.format("Unable to get the file object for path %s.", gcsPath), + e); + } + } + + /** + * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. + */ + public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) + throws IOException { + // List all objects that start with the prefix (including objects in sub-directories). + Storage.Objects.List listObject = storageClient.objects().list(bucket); + listObject.setMaxResults(MAX_LIST_ITEMS_PER_CALL); + listObject.setPrefix(prefix); + + if (pageToken != null) { + listObject.setPageToken(pageToken); + } + + try { + return ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(listObject), + BACKOFF_FACTORY.backoff(), + RetryDeterminer.SOCKET_ERRORS, + IOException.class); + } catch (Exception e) { + throw new IOException( + String.format("Unable to match files in bucket %s, prefix %s.", bucket, prefix), + e); } } @@ -321,12 +316,12 @@ public class GcsUtil { */ @VisibleForTesting List<Long> fileSizes(Collection<GcsPath> paths) throws IOException { - List<long[]> results = Lists.newArrayList(); + List<StorageObject[]> results = Lists.newArrayList(); executeBatches(makeGetBatches(paths, results)); ImmutableList.Builder<Long> ret = ImmutableList.builder(); - for (long[] result : results) { - ret.add(result[0]); + for (StorageObject[] result : results) { + ret.add(result[0].getSize().longValue()); } return ret.build(); } @@ -542,7 +537,7 @@ public class GcsUtil { @VisibleForTesting List<BatchRequest> makeGetBatches( Collection<GcsPath> paths, - List<long[]> results) throws IOException { + List<StorageObject[]> results) throws IOException { List<BatchRequest> batches = new LinkedList<>(); for (List<GcsPath> filesToGet : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { @@ -601,15 +596,16 @@ public class GcsUtil { executeBatches(makeRemoveBatches(filenames)); } - private long[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) throws IOException { - final long[] fileSize = new long[1]; + private StorageObject[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) + throws IOException { + final StorageObject[] storageObject = new StorageObject[1]; Storage.Objects.Get getRequest = storageClient.objects() .get(path.getBucket(), path.getObject()); getRequest.queue(batch, new JsonBatchCallback<StorageObject>() { @Override public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException { - fileSize[0] = response.getSize().longValue(); + storageObject[0] = response; } @Override @@ -621,7 +617,7 @@ public class GcsUtil { } } }); - return fileSize; + return storageObject; } private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) http://git-wip-us.apache.org/repos/asf/beam/blob/51a8c37a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index d592761..920e593 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -320,7 +320,7 @@ public class GcsUtilTest { when(mockStorageGet.execute()).thenThrow(expectedException); thrown.expect(IOException.class); - thrown.expectMessage("Unable to match files for pattern"); + thrown.expectMessage("Unable to get the file object for path"); gcsUtil.expand(pattern); } @@ -381,8 +381,11 @@ public class GcsUtilTest { .thenThrow(new SocketTimeoutException("SocketException")) .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); - assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"), - mockBackOff, new FastNanoClockAndSleeper())); + assertEquals(1000, + gcsUtil.getObject( + GcsPath.fromComponents("testbucket", "testobject"), + mockBackOff, + new FastNanoClockAndSleeper()).getSize().longValue()); assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); } @@ -752,7 +755,7 @@ public class GcsUtilTest { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); // Small number of files fits in 1 batch - List<long[]> results = Lists.newArrayList(); + List<StorageObject[]> results = Lists.newArrayList(); List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3));
