Repository: beam Updated Branches: refs/heads/master d04a88e7e -> 763fb50b5
[BEAM-59] Beam GcsFileSystem: port fileSizes() from GcsUtil for batch get StorageObjects. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85f04085 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85f04085 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85f04085 Branch: refs/heads/master Commit: 85f04085dbfbe130cfc10e0b7451a4aa9d8d8df2 Parents: d04a88e Author: Pei He <[email protected]> Authored: Wed Feb 15 14:07:45 2017 -0800 Committer: Pei He <[email protected]> Committed: Thu Feb 16 15:17:09 2017 -0800 ---------------------------------------------------------------------- .../src/main/resources/beam/findbugs-filter.xml | 11 +++ .../java/org/apache/beam/sdk/util/GcsUtil.java | 85 +++++++++++++++++--- .../org/apache/beam/sdk/util/GcsUtilTest.java | 27 +++++-- .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 49 ++++++++--- .../sdk/io/gcp/storage/GcsFileSystemTest.java | 35 ++++++++ 5 files changed, 179 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index edbdb14..2ffd648 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -357,4 +357,15 @@ <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/> <!--[BEAM-421] Class doesn't override equals in superclass--> </Match> + <Match> + <Class name="org.apache.beam.sdk.util.AutoValue_GcsUtil_StorageObjectOrIOException"/> + <Bug pattern="NM_CLASS_NOT_EXCEPTION"/> + <!-- It is clear from the name that this class holds either StorageObject or IOException. --> + </Match> + + <Match> + <Class name="org.apache.beam.sdk.util.GcsUtil$StorageObjectOrIOException"/> + <Bug pattern="NM_CLASS_NOT_EXCEPTION"/> + <!-- It is clear from the name that this class holds either StorageObject or IOException. --> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 6345867..ea0cf9e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.client.googleapis.batch.BatchRequest; import com.google.api.client.googleapis.batch.json.JsonBatchCallback; @@ -31,6 +32,7 @@ import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; +import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; import com.google.cloud.hadoop.gcsio.ObjectWriteConditions; @@ -52,6 +54,7 @@ import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; @@ -336,6 +339,21 @@ public class GcsUtil { } /** + * Returns {@link StorageObjectOrIOException StorageObjectOrIOExceptions} for the given + * {@link GcsPath GcsPaths}. + */ + public List<StorageObjectOrIOException> getObjects(List<GcsPath> gcsPaths) + throws IOException { + List<StorageObjectOrIOException[]> results = new ArrayList<>(); + executeBatches(makeGetBatches(gcsPaths, results)); + ImmutableList.Builder<StorageObjectOrIOException> ret = ImmutableList.builder(); + for (StorageObjectOrIOException[] result : results) { + ret.add(result[0]); + } + return ret.build(); + } + + /** * Lists {@link Objects} given the {@code bucket}, {@code prefix}, {@code pageToken}. */ public Objects listObjects(String bucket, String prefix, @Nullable String pageToken) @@ -367,17 +385,25 @@ public class GcsUtil { * if the resource does not exist. */ @VisibleForTesting - List<Long> fileSizes(Collection<GcsPath> paths) throws IOException { - List<StorageObject[]> results = Lists.newArrayList(); - executeBatches(makeGetBatches(paths, results)); + List<Long> fileSizes(List<GcsPath> paths) throws IOException { + List<StorageObjectOrIOException> results = getObjects(paths); ImmutableList.Builder<Long> ret = ImmutableList.builder(); - for (StorageObject[] result : results) { - ret.add(result[0].getSize().longValue()); + for (StorageObjectOrIOException result : results) { + ret.add(toFileSize(result)); } return ret.build(); } + private Long toFileSize(StorageObjectOrIOException storageObjectOrIOException) + throws IOException { + if (storageObjectOrIOException.ioException() != null) { + throw storageObjectOrIOException.ioException(); + } else { + return storageObjectOrIOException.storageObject().getSize().longValue(); + } + } + /** * Opens an object in GCS. * @@ -589,7 +615,7 @@ public class GcsUtil { @VisibleForTesting List<BatchRequest> makeGetBatches( Collection<GcsPath> paths, - List<StorageObject[]> results) throws IOException { + List<StorageObjectOrIOException[]> results) throws IOException { List<BatchRequest> batches = new LinkedList<>(); for (List<GcsPath> filesToGet : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { @@ -648,28 +674,63 @@ public class GcsUtil { executeBatches(makeRemoveBatches(filenames)); } - private StorageObject[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) + private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) throws IOException { - final StorageObject[] storageObject = new StorageObject[1]; + final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; Storage.Objects.Get getRequest = storageClient.objects() .get(path.getBucket(), path.getObject()); getRequest.queue(batch, new JsonBatchCallback<StorageObject>() { @Override public void onSuccess(StorageObject response, HttpHeaders httpHeaders) throws IOException { - storageObject[0] = response; + ret[0] = StorageObjectOrIOException.create(response); } @Override public void onFailure(GoogleJsonError e, HttpHeaders httpHeaders) throws IOException { + IOException ioException; if (errorExtractor.itemNotFound(e)) { - throw new FileNotFoundException(path.toString()); + ioException = new FileNotFoundException(path.toString()); } else { - throw new IOException(String.format("Error trying to get %s: %s", path, e)); + ioException = new IOException(String.format("Error trying to get %s: %s", path, e)); } + ret[0] = StorageObjectOrIOException.create(ioException); } }); - return storageObject; + return ret; + } + + /** + * A class that holds either a {@link StorageObject} or an {@link IOException}. + */ + @AutoValue + public abstract static class StorageObjectOrIOException { + + /** + * Returns the {@link StorageObject}. + */ + @Nullable + public abstract StorageObject storageObject(); + + /** + * Returns the {@link IOException}. + */ + @Nullable + public abstract IOException ioException(); + + @VisibleForTesting + public static StorageObjectOrIOException create(StorageObject storageObject) { + return new AutoValue_GcsUtil_StorageObjectOrIOException( + checkNotNull(storageObject, "storageObject"), + null /* ioException */); + } + + @VisibleForTesting + public static StorageObjectOrIOException create(IOException ioException) { + return new AutoValue_GcsUtil_StorageObjectOrIOException( + null /* storageObject */, + checkNotNull(ioException, "ioException")); + } } private void enqueueCopy(final GcsPath from, final GcsPath to, BatchRequest batch) http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 920e593..03668ce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -76,6 +76,7 @@ import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.junit.Rule; import org.junit.Test; @@ -394,18 +395,24 @@ public class GcsUtilTest { JsonFactory jsonFactory = new JacksonFactory(); String contentBoundary = "batch_foobarbaz"; + String contentBoundaryLine = "--" + contentBoundary; + String endOfContentBoundaryLine = "--" + contentBoundary + "--"; GenericJson error = new GenericJson() .set("error", new GenericJson().set("code", 404)); error.setFactory(jsonFactory); - String content = contentBoundary + "\n" + String content = contentBoundaryLine + "\n" + "Content-Type: application/http\n" + "\n" + "HTTP/1.1 404 Not Found\n" - + "Content-Length: 105\n" + + "Content-Length: -1\n" + "\n" - + error.toString(); + + error.toString() + + "\n" + + "\n" + + endOfContentBoundaryLine + + "\n"; thrown.expect(FileNotFoundException.class); MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() .setContentType("multipart/mixed; boundary=" + contentBoundary) @@ -426,18 +433,24 @@ public class GcsUtilTest { JsonFactory jsonFactory = new JacksonFactory(); String contentBoundary = "batch_foobarbaz"; + String contentBoundaryLine = "--" + contentBoundary; + String endOfContentBoundaryLine = "--" + contentBoundary + "--"; GenericJson error = new GenericJson() .set("error", new GenericJson().set("code", 404)); error.setFactory(jsonFactory); - String content = contentBoundary + "\n" + String content = contentBoundaryLine + "\n" + "Content-Type: application/http\n" + "\n" + "HTTP/1.1 404 Not Found\n" - + "Content-Length: 105\n" + + "Content-Length: -1\n" + + "\n" + + error.toString() + + "\n" + "\n" - + error.toString(); + + endOfContentBoundaryLine + + "\n"; thrown.expect(FileNotFoundException.class); final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); @@ -755,7 +768,7 @@ public class GcsUtilTest { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); // Small number of files fits in 1 batch - List<StorageObject[]> results = Lists.newArrayList(); + List<StorageObjectOrIOException[]> results = Lists.newArrayList(); List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index 1811fec..b2a712d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -24,6 +24,8 @@ import com.google.api.services.storage.model.StorageObject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; import java.nio.channels.ReadableByteChannel; @@ -39,6 +41,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,15 +128,32 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { return MatchResult.create(Status.OK, results.toArray(new Metadata[results.size()])); } - private List<String> toFilenames(Collection<GcsResourceId> resources) { - return FluentIterable.from(resources) - .transform( - new Function<GcsResourceId, String>() { - @Override - public String apply(GcsResourceId resource) { - return resource.getGcsPath().toString(); - }}) - .toList(); + /** + * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}. + * + *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given + * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}. + */ + @VisibleForTesting + List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException { + List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths); + + ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); + for (StorageObjectOrIOException result : results) { + ret.add(toMatchResult(result)); + } + return ret.build(); + } + + private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) { + if (objectOrException.ioException() instanceof FileNotFoundException) { + return MatchResult.create(Status.NOT_FOUND, objectOrException.ioException()); + } else if (objectOrException.ioException() != null) { + return MatchResult.create(Status.ERROR, objectOrException.ioException()); + } else { + return MatchResult.create( + Status.OK, new Metadata[]{toMetadata(objectOrException.storageObject())}); + } } private Metadata toMetadata(StorageObject storageObject) { @@ -148,4 +168,15 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { } return ret.build(); } + + private List<String> toFilenames(Collection<GcsResourceId> resources) { + return FluentIterable.from(resources) + .transform( + new Function<GcsResourceId, String>() { + @Override + public String apply(GcsResourceId resource) { + return resource.getGcsPath().toString(); + }}) + .toList(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/85f04085/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java index 4deb7b3..8b8a788 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.storage; import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -29,15 +30,18 @@ import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.junit.Before; import org.junit.Rule; @@ -168,6 +172,37 @@ public class GcsFileSystemTest { gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**")); } + @Test + public void testMatchNonGlobs() throws Exception { + List<StorageObjectOrIOException> items = new ArrayList<>(); + // Files within the directory + items.add(StorageObjectOrIOException.create( + createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */))); + items.add(StorageObjectOrIOException.create(new FileNotFoundException())); + items.add(StorageObjectOrIOException.create(new IOException())); + items.add(StorageObjectOrIOException.create( + createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */))); + + List<GcsPath> gcsPaths = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file3name"), + GcsPath.fromUri("gs://testbucket/testdirectory/file4name")); + + when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items); + List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths); + + assertEquals(4, matchResults.size()); + assertThat( + ImmutableList.of("gs://testbucket/testdirectory/file1name"), + contains(toFilenames(matchResults.get(0)).toArray())); + assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); + assertEquals(Status.ERROR, matchResults.get(2).status()); + assertThat( + ImmutableList.of("gs://testbucket/testdirectory/file4name"), + contains(toFilenames(matchResults.get(3)).toArray())); + } + private StorageObject createStorageObject(String gcsFilename, long fileSize) { GcsPath gcsPath = GcsPath.fromUri(gcsFilename); return new StorageObject()
