Repository: beam Updated Branches: refs/heads/master 62ee27542 -> 15bd3a394
[BEAM-2150] Relax regex to support wildcard globbing for GCS Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/316ff6bf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/316ff6bf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/316ff6bf Branch: refs/heads/master Commit: 316ff6bf9f472687667482805062b0aa6417aa8f Parents: 62ee275 Author: Devon Meunier <[email protected]> Authored: Wed May 3 16:22:16 2017 -0400 Committer: Dan Halperin <[email protected]> Committed: Wed May 10 18:59:30 2017 -0700 ---------------------------------------------------------------------- .../extensions/gcp/storage/GcsFileSystem.java | 6 +-- .../gcp/storage/GcsPathValidator.java | 4 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 46 +++++----------- .../gcp/storage/GcsFileSystemTest.java | 8 --- .../org/apache/beam/sdk/util/GcsUtilTest.java | 57 ++++++++++++++++---- 5 files changed, 63 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/316ff6bf/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index 9052a5a..6db0a01 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -73,7 +73,7 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { List<Boolean> isGlobBooleans = Lists.newArrayList(); for (GcsPath path : gcsPaths) { - if (GcsUtil.isGlob(path)) { + if (GcsUtil.isWildcard(path)) { globs.add(path); isGlobBooleans.add(true); } else { @@ -178,8 +178,8 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { */ @VisibleForTesting MatchResult expand(GcsPath gcsPattern) throws IOException { - String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject()); - Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject())); + String prefix = GcsUtil.getNonWildcardPrefix(gcsPattern.getObject()); + Pattern p = Pattern.compile(GcsUtil.wildcardToRegexp(gcsPattern.getObject())); LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), prefix, p.toString()); http://git-wip-us.apache.org/repos/asf/beam/blob/316ff6bf/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java index e7257b2..6b10bfb 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; /** @@ -47,8 +46,7 @@ public class GcsPathValidator implements PathValidator { */ @Override public void validateInputFilePatternSupported(String filepattern) { - GcsPath gcsPath = getGcsPath(filepattern); - checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject())); + getGcsPath(filepattern); verifyPath(filepattern); verifyPathIsAccessible(filepattern, "Could not find file %s"); } http://git-wip-us.apache.org/repos/asf/beam/blob/316ff6bf/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 18e3e2b..94b733a 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -125,14 +125,6 @@ public class GcsUtil { /** Matches a glob containing a wildcard, capturing the portion before the first wildcard. */ private static final Pattern GLOB_PREFIX = Pattern.compile("(?<PREFIX>[^\\[*?]*)[\\[*?].*"); - private static final String RECURSIVE_WILDCARD = "[*]{2}"; - - /** - * A {@link Pattern} for globs with a recursive wildcard. - */ - private static final Pattern RECURSIVE_GCS_PATTERN = - Pattern.compile(".*" + RECURSIVE_WILDCARD + ".*"); - /** * Maximum number of requests permitted in a GCS batch request. */ @@ -160,22 +152,9 @@ public class GcsUtil { final ExecutorService executorService; /** - * Returns true if the given GCS pattern is supported otherwise fails with an - * exception. - */ - public static boolean isGcsPatternSupported(String gcsPattern) { - if (RECURSIVE_GCS_PATTERN.matcher(gcsPattern).matches()) { - throw new IllegalArgumentException("Unsupported wildcard usage in \"" + gcsPattern + "\": " - + " recursive wildcards are not supported."); - } - return true; - } - - /** * Returns the prefix portion of the glob that doesn't contain wildcards. */ - public static String getGlobPrefix(String globExp) { - checkArgument(isGcsPatternSupported(globExp)); + public static String getNonWildcardPrefix(String globExp) { Matcher m = GLOB_PREFIX.matcher(globExp); checkArgument( m.matches(), @@ -189,15 +168,15 @@ public class GcsUtil { * @param globExp the glob expression to expand * @return a string with the regular expression this glob expands to */ - public static String globToRegexp(String globExp) { + public static String wildcardToRegexp(String globExp) { StringBuilder dst = new StringBuilder(); - char[] src = globExp.toCharArray(); + char[] src = globExp.replace("**/*", "**").toCharArray(); int i = 0; while (i < src.length) { char c = src[i++]; switch (c) { case '*': - dst.append("[^/]*"); + dst.append(".*"); break; case '?': dst.append("[^/]"); @@ -226,9 +205,9 @@ public class GcsUtil { } /** - * Returns true if the given {@code spec} contains glob. + * Returns true if the given {@code spec} contains wildcard. */ - public static boolean isGlob(GcsPath spec) { + public static boolean isWildcard(GcsPath spec) { return GLOB_PREFIX.matcher(spec.getObject()).matches(); } @@ -254,11 +233,14 @@ public class GcsUtil { * exists. */ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException { - checkArgument(isGcsPatternSupported(gcsPattern.getObject())); Pattern p = null; String prefix = null; - if (!isGlob(gcsPattern)) { - // Not a glob. + if (isWildcard(gcsPattern)) { + // Part before the first wildcard character. + prefix = getNonWildcardPrefix(gcsPattern.getObject()); + p = Pattern.compile(wildcardToRegexp(gcsPattern.getObject())); + } else { + // Not a wildcard. try { // Use a get request to fetch the metadata of the object, and ignore the return value. // The request has strong global consistency. @@ -268,10 +250,6 @@ public class GcsUtil { // If the path was not found, return an empty list. return ImmutableList.of(); } - } else { - // Part before the first wildcard character. - prefix = getGlobPrefix(gcsPattern.getObject()); - p = Pattern.compile(globToRegexp(gcsPattern.getObject())); } LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(), http://git-wip-us.apache.org/repos/asf/beam/blob/316ff6bf/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java index 03194d2..ce6bbba 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java @@ -214,14 +214,6 @@ public class GcsFileSystemTest { gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); } - // Patterns that contain recursive wildcards ('**') are not supported. - @Test - public void testRecursiveGlobExpansionFails() throws IOException { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unsupported wildcard usage"); - gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**")); - } - @Test public void testMatchNonGlobs() throws Exception { List<StorageObjectOrIOException> items = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/316ff6bf/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 0af584e..5326450 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -92,10 +92,12 @@ public class GcsUtilTest { @Test public void testGlobTranslation() { - assertEquals("foo", GcsUtil.globToRegexp("foo")); - assertEquals("fo[^/]*o", GcsUtil.globToRegexp("fo*o")); - assertEquals("f[^/]*o\\.[^/]", GcsUtil.globToRegexp("f*o.?")); - assertEquals("foo-[0-9][^/]*", GcsUtil.globToRegexp("foo-[0-9]*")); + assertEquals("foo", GcsUtil.wildcardToRegexp("foo")); + assertEquals("fo.*o", GcsUtil.wildcardToRegexp("fo*o")); + assertEquals("f.*o\\.[^/]", GcsUtil.wildcardToRegexp("f*o.?")); + assertEquals("foo-[0-9].*", GcsUtil.wildcardToRegexp("foo-[0-9]*")); + assertEquals(".*.*foo", GcsUtil.wildcardToRegexp("**/*foo")); + assertEquals(".*.*foo", GcsUtil.wildcardToRegexp("**foo")); } private static GcsOptions gcsOptionsWithTestCredential() { @@ -260,16 +262,51 @@ public class GcsUtilTest { } } - // Patterns that contain recursive wildcards ('**') are not supported. @Test - public void testRecursiveGlobExpansionFails() throws IOException { + public void testRecursiveGlobExpansion() throws IOException { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); - GcsPath pattern = GcsPath.fromUri("gs://testbucket/test**"); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unsupported wildcard usage"); - gcsUtil.expand(pattern); + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); + Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); + + Objects modelObjects = new Objects(); + List<StorageObject> items = new ArrayList<>(); + // A directory + items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); + + // Files within the directory + items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file1.txt")); + items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file2.txt")); + items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file3.txt")); + items.add(new StorageObject().setBucket("testbucket").setName("test/directory/otherfile")); + items.add(new StorageObject().setBucket("testbucket").setName("test/directory/anotherfile")); + items.add(new StorageObject().setBucket("testbucket").setName("test/file4.txt")); + + modelObjects.setItems(items); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.get("testbucket", "test/directory/otherfile")).thenReturn( + mockStorageGet); + when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); + when(mockStorageGet.execute()).thenReturn( + new StorageObject().setBucket("testbucket").setName("test/directory/otherfile")); + when(mockStorageList.execute()).thenReturn(modelObjects); + + { + GcsPath pattern = GcsPath.fromUri("gs://testbucket/test/**/*.txt"); + List<GcsPath> expectedFiles = ImmutableList.of( + GcsPath.fromUri("gs://testbucket/test/directory/file1.txt"), + GcsPath.fromUri("gs://testbucket/test/directory/file2.txt"), + GcsPath.fromUri("gs://testbucket/test/directory/file3.txt"), + GcsPath.fromUri("gs://testbucket/test/file4.txt")); + + assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); + } } // GCSUtil.expand() should fail when matching a single object when that object does not exist.
