Repository: beam Updated Branches: refs/heads/master de0148c3e -> d2133d34b
Add local filesystem as default and strip away prefix from local files in pipeline options. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5017bfc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5017bfc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5017bfc Branch: refs/heads/master Commit: b5017bfc85fdf5bb7cec4d2fe52963e9db32ed18 Parents: de0148c Author: Flavio Fiszman <flavi...@flaviocf-macbookpro.roam.corp.google.com> Authored: Fri Jun 30 10:14:34 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Aug 3 06:34:02 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileSystems.java | 17 ++++--- .../org/apache/beam/sdk/io/LocalFileSystem.java | 36 +++++++++++++- .../apache/beam/sdk/io/LocalFileSystemTest.java | 49 ++++++++++++++++++++ 3 files changed, 92 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b5017bfc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 2ed29e3..bd4668f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -69,13 +69,13 @@ import org.apache.beam.sdk.values.KV; @Experimental(Kind.FILESYSTEM) public class FileSystems { - public static final String DEFAULT_SCHEME = "default"; + public static final String DEFAULT_SCHEME = "file"; private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*"); private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM = new AtomicReference<Map<String, FileSystem>>( - ImmutableMap.<String, FileSystem>of("file", new LocalFileSystem())); + ImmutableMap.<String, FileSystem>of(DEFAULT_SCHEME, new LocalFileSystem())); /********************************** METHODS FOR CLIENT **********************************/ @@ -99,6 +99,9 @@ public class FileSystems { * component of {@link ResourceId}. This allows SDK libraries to construct file system agnostic * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs. * + * <p>In case the spec schemes don't match any known {@link FileSystem} implementations, + * FileSystems will attempt to use {@link LocalFileSystem} to resolve a path. + * * @return {@code List<MatchResult>} in the same order of the input specs. * * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes. @@ -176,7 +179,7 @@ public class FileSystems { .transform(new Function<ResourceId, String>() { @Override public String apply(@Nonnull ResourceId resourceId) { - return resourceId.toString(); + return resourceId.toString(); }}) .toList()); } @@ -423,7 +426,7 @@ public class FileSystems { Matcher matcher = FILE_SCHEME_PATTERN.matcher(spec); if (!matcher.matches()) { - return "file"; + return DEFAULT_SCHEME; } else { return matcher.group("scheme").toLowerCase(); } @@ -440,11 +443,7 @@ public class FileSystems { if (rval != null) { return rval; } - rval = schemeToFileSystem.get(DEFAULT_SCHEME); - if (rval != null) { - return rval; - } - throw new IllegalStateException("Unable to find registrar for " + scheme); + return schemeToFileSystem.get(DEFAULT_SCHEME); } /********************************** METHODS FOR REGISTRATION **********************************/ http://git-wip-us.apache.org/repos/asf/beam/blob/b5017bfc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index b732bee..5fe894d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -46,11 +47,32 @@ import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.commons.lang3.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * {@link FileSystem} implementation for local files. + * + * {@link #match} should interpret {@code spec} and resolve paths correctly according to OS being + * used. In order to do that specs should be defined in one of the below formats: + * + * <p>Linux/Mac: + * <ul> + * <li>pom.xml</li> + * <li>/Users/beam/Documents/pom.xml</li> + * <li>file:/Users/beam/Documents/pom.xml</li> + * <li>file:///Users/beam/Documents/pom.xml</li> + * </ul> + * + * <p>Windows OS: + * <ul> + * <li>pom.xml</li> + * <li>C:/Users/beam/Documents/pom.xml</li> + * <li>C:\\Users\\beam\\Documents\\pom.xml</li> + * <li>file:/C:/Users/beam/Documents/pom.xml</li> + * <li>file:///C:/Users/beam/Documents/pom.xml</li> + * </ul> */ class LocalFileSystem extends FileSystem<LocalResourceId> { @@ -176,8 +198,20 @@ class LocalFileSystem extends FileSystem<LocalResourceId> { } private MatchResult matchOne(String spec) throws IOException { - File file = Paths.get(spec).toFile(); + if (spec.toLowerCase().startsWith("file:")) { + spec = spec.substring("file:".length()); + } + if (SystemUtils.IS_OS_WINDOWS) { + List<String> prefixes = Arrays.asList("///", "/"); + for (String prefix : prefixes) { + if (spec.toLowerCase().startsWith(prefix)) { + spec = spec.substring(prefix.length()); + } + } + } + + File file = Paths.get(spec).toFile(); if (file.exists()) { return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(file))); } http://git-wip-us.apache.org/repos/asf/beam/blob/b5017bfc/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java index 048908f..aaaeb83 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java @@ -45,7 +45,9 @@ import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.testing.RestoreSystemProperties; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.commons.lang3.SystemUtils; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -61,6 +63,7 @@ import org.junit.runners.JUnit4; public class LocalFileSystemTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); private LocalFileSystem localFileSystem = new LocalFileSystem(); @Test @@ -242,6 +245,52 @@ public class LocalFileSystemTest { } @Test + public void testMatchInDirectory() throws Exception { + List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString()); + temporaryFolder.newFile("aa"); + temporaryFolder.newFile("ab"); + + String expectedFile = expected.get(0); + int slashIndex = expectedFile.lastIndexOf('/'); + if (SystemUtils.IS_OS_WINDOWS) { + slashIndex = expectedFile.lastIndexOf('\\'); + } + String directory = expectedFile.substring(0, slashIndex); + String relative = expectedFile.substring(slashIndex + 1); + System.setProperty("user.dir", directory); + List<MatchResult> results = localFileSystem.match(ImmutableList.of(relative)); + assertThat( + toFilenames(results), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchWithFileSlashPrefix() throws Exception { + List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString()); + temporaryFolder.newFile("aa"); + temporaryFolder.newFile("ab"); + + String file = "file:/" + temporaryFolder.getRoot().toPath().resolve("a").toString(); + List<MatchResult> results = localFileSystem.match(ImmutableList.of(file)); + assertThat( + toFilenames(results), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchWithFileThreeSlashesPrefix() throws Exception { + List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString()); + temporaryFolder.newFile("aa"); + temporaryFolder.newFile("ab"); + + String file = "file:///" + temporaryFolder.getRoot().toPath().resolve("a").toString(); + List<MatchResult> results = localFileSystem.match(ImmutableList.of(file)); + assertThat( + toFilenames(results), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test public void testMatchMultipleWithoutSubdirectoryExpansion() throws Exception { File unmatchedSubDir = temporaryFolder.newFolder("aaa"); File unmatchedSubDirFile = File.createTempFile("sub-dir-file", "", unmatchedSubDir);