Repository: beam Updated Branches: refs/heads/master 49809d1d4 -> bea101a44
[BEAM-59] Beam FileSystem: match() and its local implementation. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1648c47 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1648c47 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1648c47 Branch: refs/heads/master Commit: d1648c47dd4fef00273ceb46d42d784325b3b1e8 Parents: 49809d1 Author: Pei He <pe...@google.com> Authored: Fri Feb 10 21:53:31 2017 -0800 Committer: Pei He <pe...@google.com> Committed: Mon Feb 13 22:08:50 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/FileSystem.java | 30 ++++ .../org/apache/beam/sdk/io/LocalFileSystem.java | 74 ++++++++++ .../org/apache/beam/sdk/io/fs/MatchResult.java | 125 ++++++++++++++++ .../apache/beam/sdk/io/LocalFileSystemTest.java | 148 +++++++++++++++++++ .../beam/sdk/util/FileIOChannelFactoryTest.java | 13 +- .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 6 + .../beam/sdk/io/hdfs/HadoopFileSystem.java | 6 + 7 files changed, 391 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index ecfa29b..001f596 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; /** @@ -35,6 +36,35 @@ import org.apache.beam.sdk.io.fs.ResourceId; * Clients should use {@link FileSystems} utility. */ public abstract class FileSystem<ResourceIdT extends ResourceId> { + /** + * This is the entry point to convert user-provided specs to {@link ResourceIdT ResourceIds}. + * Callers should use {@link #match} to resolve users specs ambiguities before + * calling other methods. + * + * <p>Implementation should handle the following ambiguities of a user-provided spec: + * <ol> + * <li>{@code spec} could be a glob or a uri. {@link #match} should be able to tell and + * choose efficient implementations. + * <li>The user-provided {@code spec} might refer to files or directories. It is common that + * users that wish to indicate a directory will omit the trailing {@code /}, such as in a spec of + * {@code "/tmp/dir"}. The {@link FileSystem} should be able to recognize a directory with + * the trailing {@code /} omitted, but should always return a correct {@link ResourceIdT} + * (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}. + * </ol> + * + * <p>All {@link FileSystem} implementations should support glob in the final hierarchical path + * component of {@link ResourceIdT}. This allows SDK libraries to construct file system agnostic + * spec. {@link FileSystem FileSystems} can support additional patterns for user-provided specs. + * + * @return {@code List<MatchResult>} in the same order of the input specs. + * + * @throws IllegalArgumentException if specs are invalid. + * @throws IOException if all specs failed to match due to issues like: + * network connection, authorization. + * Exception for individual spec need to be deferred until callers retrieve + * metadata with {@link MatchResult#metadata()}. + */ + protected abstract List<MatchResult> match(List<String> specs) throws IOException; /** * Returns a write channel for the given {@link ResourceIdT}. http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 0e79c9c..fe6b643 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -19,6 +19,11 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -29,10 +34,16 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Collection; import java.util.List; +import java.util.regex.Matcher; import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +54,21 @@ class LocalFileSystem extends FileSystem<LocalResourceId> { private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class); + private static final Metadata[] EMPTY_METADATA = new Metadata[0]; + LocalFileSystem() { } @Override + protected List<MatchResult> match(List<String> specs) throws IOException { + ImmutableList.Builder<MatchResult> ret = ImmutableList.builder(); + for (String spec : specs) { + ret.add(matchOne(spec)); + } + return ret.build(); + } + + @Override protected WritableByteChannel create(LocalResourceId resourceId, CreateOptions createOptions) throws IOException { LOG.debug("creating file {}", resourceId); @@ -143,4 +165,56 @@ class LocalFileSystem extends FileSystem<LocalResourceId> { } } } + + private MatchResult matchOne(String spec) throws IOException { + File file = Paths.get(spec).toFile(); + + if (file.exists()) { + return MatchResult.create(Status.OK, new Metadata[]{toMetadata(file)}); + } + + File parent = file.getAbsoluteFile().getParentFile(); + if (!parent.exists()) { + return MatchResult.create(Status.NOT_FOUND, EMPTY_METADATA); + } + + // Method getAbsolutePath() on Windows platform may return something like + // "c:\temp\file.txt". FileSystem.getPathMatcher() call below will treat + // '\' (backslash) as an escape character, instead of a directory + // separator. Replacing backslash with double-backslash solves the problem. + // We perform the replacement on all platforms, even those that allow + // backslash as a part of the filename, because Globs.toRegexPattern will + // eat one backslash. + String pathToMatch = file.getAbsolutePath().replaceAll(Matcher.quoteReplacement("\\"), + Matcher.quoteReplacement("\\\\")); + + final PathMatcher matcher = + java.nio.file.FileSystems.getDefault().getPathMatcher("glob:" + pathToMatch); + + // TODO: Avoid iterating all files: https://issues.apache.org/jira/browse/BEAM-1309 + Iterable<File> files = com.google.common.io.Files.fileTreeTraverser().preOrderTraversal(parent); + Iterable<File> matchedFiles = Iterables.filter(files, + Predicates.and( + com.google.common.io.Files.isFile(), + new Predicate<File>() { + @Override + public boolean apply(File input) { + return matcher.matches(input.toPath()); + } + })); + + List<Metadata> result = Lists.newLinkedList(); + for (File match : matchedFiles) { + result.add(toMetadata(match)); + } + return MatchResult.create(Status.OK, result.toArray(new Metadata[result.size()])); + } + + private Metadata toMetadata(File file) { + return Metadata.builder() + .setResourceId(LocalResourceId.fromPath(file.toPath(), file.isDirectory())) + .setIsReadSeekEfficient(true) + .setSizeBytes(file.length()) + .build(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java new file mode 100644 index 0000000..80ee00f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import com.google.auto.value.AutoValue; +import java.io.IOException; + +/** + * The result of {@link org.apache.beam.sdk.io.FileSystem#match}. + */ +public abstract class MatchResult { + + private MatchResult() {} + + /** + * Returns a {@link MatchResult} given the {@link Status} and {@link Metadata}. + */ + public static MatchResult create(final Status status, final Metadata[] metadata) { + return new MatchResult() { + @Override + public Status status() { + return status; + } + + @Override + public Metadata[] metadata() throws IOException { + return metadata; + } + }; + } + + /** + * Returns a {@link MatchResult} given the {@link Status} and {@link IOException}. + */ + public static MatchResult create(final Status status, final IOException e) { + return new MatchResult() { + @Override + public Status status() { + return status; + } + + @Override + public Metadata[] metadata() throws IOException { + throw e; + } + }; + } + + /** + * Returns a {@link MatchResult} with {@link Status#UNKNOWN}. + */ + public static MatchResult unknown() { + return new MatchResult() { + @Override + public Status status() { + return Status.UNKNOWN; + } + + @Override + public Metadata[] metadata() throws IOException { + throw new IOException("MatchResult status is UNKNOWN, and metadata is not available."); + } + }; + } + + /** + * Status of the {@link MatchResult}. + */ + public abstract Status status(); + + /** + * {@link Metadata} of matched files. + */ + public abstract Metadata[] metadata() throws IOException; + + /** + * {@link Metadata} of a matched file. + */ + @AutoValue + public abstract static class Metadata { + public abstract ResourceId resourceId(); + public abstract long sizeBytes(); + public abstract boolean isReadSeekEfficient(); + + public static Builder builder() { + return new AutoValue_MatchResult_Metadata.Builder(); + } + + /** + * Builder class for {@link Metadata}. + */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setResourceId(ResourceId value); + public abstract Builder setSizeBytes(long value); + public abstract Builder setIsReadSeekEfficient(boolean value); + public abstract Metadata build(); + } + } + + /** + * Status of a {@link MatchResult}. + */ + public enum Status { + UNKNOWN, + OK, + NOT_FOUND, + ERROR, + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java index ad9b8a0..74f8b72 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java @@ -30,15 +30,21 @@ import com.google.common.io.Files; import com.google.common.io.LineReader; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.io.Reader; import java.io.Writer; import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.util.MimeTypes; +import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -147,6 +153,122 @@ public class LocalFileSystemTest { } } + @Test + public void testMatchExact() throws Exception { + List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString()); + temporaryFolder.newFile("aa"); + temporaryFolder.newFile("ab"); + + List<MatchResult> matchResults = localFileSystem.match( + ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString())); + assertThat( + toFilenames(matchResults), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchPatternNone() throws Exception { + List<String> expected = ImmutableList.of(); + temporaryFolder.newFile("a"); + temporaryFolder.newFile("aa"); + temporaryFolder.newFile("ab"); + + List<MatchResult> matchResults = + matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("b"), "*"); + assertThat( + toFilenames(matchResults), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchForNonExistentFile() throws Exception { + List<String> expected = ImmutableList.of(); + temporaryFolder.newFile("aa"); + + List<MatchResult> matchResults = localFileSystem.match( + ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString())); + assertThat( + toFilenames(matchResults), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchMultipleWithFileExtension() throws Exception { + List<String> expected = ImmutableList.of( + temporaryFolder.newFile("a.txt").toString(), + temporaryFolder.newFile("aa.txt").toString(), + temporaryFolder.newFile("ab.txt").toString()); + temporaryFolder.newFile("a.avro"); + temporaryFolder.newFile("ab.avro"); + + List<MatchResult> matchResults = + matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "*.txt"); + assertThat( + toFilenames(matchResults), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchMultipleWithoutSubdirectoryExpansion() throws Exception { + File unmatchedSubDir = temporaryFolder.newFolder("aaa"); + File unmatchedSubDirFile = File.createTempFile("sub-dir-file", "", unmatchedSubDir); + unmatchedSubDirFile.deleteOnExit(); + List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString(), + temporaryFolder.newFile("aa").toString(), temporaryFolder.newFile("ab").toString()); + temporaryFolder.newFile("ba"); + temporaryFolder.newFile("bb"); + + List<MatchResult> matchResults = + matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "*"); + assertThat( + toFilenames(matchResults), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchMultipleWithSubdirectoryExpansion() throws Exception { + File matchedSubDir = temporaryFolder.newFolder("a"); + File matchedSubDirFile = File.createTempFile("sub-dir-file", "", matchedSubDir); + matchedSubDirFile.deleteOnExit(); + File unmatchedSubDir = temporaryFolder.newFolder("b"); + File unmatchedSubDirFile = File.createTempFile("sub-dir-file", "", unmatchedSubDir); + unmatchedSubDirFile.deleteOnExit(); + + List<String> expected = ImmutableList.of(matchedSubDirFile.toString(), + temporaryFolder.newFile("aa").toString(), temporaryFolder.newFile("ab").toString()); + temporaryFolder.newFile("ba"); + temporaryFolder.newFile("bb"); + + List<MatchResult> matchResults = + matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "**"); + assertThat( + toFilenames(matchResults), + Matchers.hasItems(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchWithDirectoryFiltersOutDirectory() throws Exception { + List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString()); + temporaryFolder.newFolder("a_dir_that_should_not_be_matched"); + + List<MatchResult> matchResults = + matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "*"); + assertThat( + toFilenames(matchResults), + containsInAnyOrder(expected.toArray(new String[expected.size()]))); + } + + @Test + public void testMatchWithoutParentDirectory() throws Exception { + Path pattern = LocalResourceId + .fromPath(temporaryFolder.getRoot().toPath(), true /* isDirectory */) + .resolve("non_existing_dir", StandardResolveOptions.RESOLVE_DIRECTORY) + .resolve("*", StandardResolveOptions.RESOLVE_FILE) + .getPath(); + assertTrue( + toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty()); + } + private void createFileWithContent(Path path, String content) throws Exception { try (Writer writer = Channels.newWriter( localFileSystem.create( @@ -157,6 +279,12 @@ public class LocalFileSystemTest { } } + private List<MatchResult> matchGlobWithPathPrefix(Path pathPrefix, String glob) + throws IOException { + // Windows doesn't like resolving paths with * in glob, so the glob is concatenated as String. + return localFileSystem.match(ImmutableList.of(pathPrefix + glob)); + } + private List<LocalResourceId> toLocalResourceIds(List<Path> paths, final boolean isDirectory) { return FluentIterable .from(paths) @@ -167,4 +295,24 @@ public class LocalFileSystemTest { }}) .toList(); } + + private List<String> toFilenames(List<MatchResult> matchResults) { + return FluentIterable + .from(matchResults) + .transformAndConcat(new Function<MatchResult, Iterable<Metadata>>() { + @Override + public Iterable<Metadata> apply(MatchResult matchResult) { + try { + return Arrays.asList(matchResult.metadata()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }}) + .transform(new Function<Metadata, String>() { + @Override + public String apply(Metadata metadata) { + return ((LocalResourceId) metadata.resourceId()).getPath().toString(); + }}) + .toList(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java index 38be65a..6062619 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java @@ -130,7 +130,7 @@ public class FileIOChannelFactoryTest { } @Test - public void testMatchNone() throws Exception { + public void testMatchPatternNone() throws Exception { List<String> expected = ImmutableList.of(); temporaryFolder.newFile("a"); temporaryFolder.newFile("aa"); @@ -142,16 +142,7 @@ public class FileIOChannelFactoryTest { } @Test - public void testMatchUsingExplicitPath() throws Exception { - List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString()); - temporaryFolder.newFile("aa"); - - assertThat(factory.match(factory.resolve(temporaryFolder.getRoot().getPath(), "a")), - containsInAnyOrder(expected.toArray(new String[expected.size()]))); - } - - @Test - public void testMatchUsingExplicitPathForNonExistentFile() throws Exception { + public void testMatchForNonExistentFile() throws Exception { List<String> expected = ImmutableList.of(); temporaryFolder.newFile("aa"); http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java index ce8e7e8..16c4f93 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.options.GcsOptions; /** @@ -41,6 +42,11 @@ class GcsFileSystem extends FileSystem<GcsResourceId> { } @Override + protected List<MatchResult> match(List<String> specs) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions) throws IOException { return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType()); http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index 25381b8..f4e35ac 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.FileSystem; import org.apache.beam.sdk.io.fs.CreateOptions; +import org.apache.beam.sdk.io.fs.MatchResult; /** * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as @@ -34,6 +35,11 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> { HadoopFileSystem() {} @Override + protected List<MatchResult> match(List<String> specs) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions) throws IOException { throw new UnsupportedOperationException();