Repository: beam Updated Branches: refs/heads/master 2ca3bf669 -> 9df1da493
[BEAM-59] Beam GcsFileSystem: implementation of match(). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/36e87385 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/36e87385 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/36e87385 Branch: refs/heads/master Commit: 36e873858b00d1e1136833023646bf1d40c0466c Parents: 2ca3bf6 Author: Pei He <[email protected]> Authored: Thu Feb 16 11:26:45 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Thu Feb 16 22:45:06 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/util/GcsUtil.java | 9 ++- .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 62 +++++++++++++++++++- .../sdk/io/gcp/storage/GcsFileSystemTest.java | 50 ++++++++++++++++ 3 files changed, 119 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/36e87385/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index ea0cf9e..434baf5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -225,6 +225,13 @@ public class GcsUtil { return dst.toString(); } + /** + * Returns true if the given {@code spec} contains glob. + */ + public static boolean isGlob(GcsPath spec) { + return GLOB_PREFIX.matcher(spec.getObject()).matches(); + } + private GcsUtil( Storage storageClient, HttpRequestInitializer httpRequestInitializer, @@ -250,7 +257,7 @@ public class GcsUtil { checkArgument(isGcsPatternSupported(gcsPattern.getObject())); Pattern p = null; String prefix = null; - if (!GLOB_PREFIX.matcher(gcsPattern.getObject()).matches()) { + if (!isGlob(gcsPattern)) { // Not a glob. try { // Use a get request to fetch the metadata of the object, and ignore the return value. http://git-wip-us.apache.org/repos/asf/beam/blob/36e87385/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index b2a712d..fac1db3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.storage; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; @@ -25,12 +26,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.regex.Pattern; @@ -60,7 +63,39 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { @Override protected List<MatchResult> match(List<String> specs) throws IOException { - throw new UnsupportedOperationException(); + List<GcsPath> gcsPaths = toGcsPaths(specs); + + List<GcsPath> globs = Lists.newArrayList(); + List<GcsPath> nonGlobs = Lists.newArrayList(); + List<Boolean> isGlobBooleans = Lists.newArrayList(); + + for (int i = 0; i < gcsPaths.size(); ++i) { + GcsPath path = gcsPaths.get(i); + if (GcsUtil.isGlob(path)) { + globs.add(path); + isGlobBooleans.add(true); + } else { + nonGlobs.add(path); + isGlobBooleans.add(false); + } + } + + Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator(); + Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator(); + + ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); + for (Boolean isGlob : isGlobBooleans) { + if (isGlob) { + checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next."); + ret.add(globsMatchResults.next()); + } else { + checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next."); + ret.add(nonGlobsMatchResults.next()); + } + } + checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults."); + checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults."); + return ret.build(); } @Override @@ -93,6 +128,21 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds)); } + private List<MatchResult> matchGlobs(List<GcsPath> globs) { + // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503. + return FluentIterable.from(globs) + .transform(new Function<GcsPath, MatchResult>() { + @Override + public MatchResult apply(GcsPath gcsPath) { + try { + return expand(gcsPath); + } catch (IOException e) { + return MatchResult.create(Status.ERROR, e); + } + }}) + .toList(); + } + /** * Expands a pattern into {@link MatchResult}. * @@ -179,4 +229,14 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { }}) .toList(); } + + private List<GcsPath> toGcsPaths(Collection<String> specs) { + return FluentIterable.from(specs) + .transform(new Function<String, GcsPath>() { + @Override + public GcsPath apply(String spec) { + return GcsPath.fromUri(spec); + }}) + .toList(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/36e87385/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java index 8b8a788..b726552 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java @@ -74,6 +74,56 @@ public class GcsFileSystemTest { } @Test + public void testMatch() throws Exception { + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */)); + items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */)); + + modelObjects.setItems(items); + when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) + .thenReturn(modelObjects); + + List<GcsPath> gcsPaths = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"), + GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); + + when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn( + ImmutableList.of( + StorageObjectOrIOException.create(new FileNotFoundException()), + StorageObjectOrIOException.create( + createStorageObject("gs://testbucket/testdirectory/otherfile", 4L)))); + + List<String> specs = ImmutableList.of( + "gs://testbucket/testdirectory/file[1-3]*", + "gs://testbucket/testdirectory/non-exist-file", + "gs://testbucket/testdirectory/otherfile"); + List<MatchResult> matchResults = gcsFileSystem.match(specs); + assertEquals(3, matchResults.size()); + assertEquals(Status.OK, matchResults.get(0).status()); + assertThat( + ImmutableList.of( + "gs://testbucket/testdirectory/file1name", + "gs://testbucket/testdirectory/file2name", + "gs://testbucket/testdirectory/file3name"), + contains(toFilenames(matchResults.get(0)).toArray())); + assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); + assertEquals(Status.OK, matchResults.get(2).status()); + assertThat( + ImmutableList.of("gs://testbucket/testdirectory/otherfile"), + contains(toFilenames(matchResults.get(2)).toArray())); + + } + + @Test public void testGlobExpansion() throws IOException { Objects modelObjects = new Objects(); List<StorageObject> items = new ArrayList<>();
