Introduces EmptyMatchTreatment parameter to FileSystems.match()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db9aede2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db9aede2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db9aede2 Branch: refs/heads/master Commit: db9aede289f8546bb30113353f07aa75daa83eba Parents: 5e43b23 Author: Eugene Kirpichov <[email protected]> Authored: Thu Aug 3 14:43:48 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Aug 4 16:38:23 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++++++++---------- .../org/apache/beam/sdk/io/FileSystems.java | 46 +++++++++++++++++ .../java/org/apache/beam/sdk/io/TextSource.java | 7 ++- .../beam/sdk/io/fs/EmptyMatchTreatment.java | 46 +++++++++++++++++ .../org/apache/beam/sdk/io/fs/MatchResult.java | 5 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 51 +++++++++++++++++++ 6 files changed, 180 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index d4413c9..7f865de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -23,19 +23,17 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import java.io.IOException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -68,6 +66,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class); private final ValueProvider<String> fileOrPatternSpec; + private final EmptyMatchTreatment emptyMatchTreatment; @Nullable private MatchResult.Metadata singleFileMetadata; private final Mode mode; @@ -80,15 +79,28 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } /** - * Create a {@code FileBaseSource} based on a file or a file pattern specification. + * Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given + * strategy for treating filepatterns that do not match any files. */ - protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) { + protected FileBasedSource( + ValueProvider<String> fileOrPatternSpec, + EmptyMatchTreatment emptyMatchTreatment, + long minBundleSize) { super(0, Long.MAX_VALUE, minBundleSize); - mode = Mode.FILEPATTERN; + this.mode = Mode.FILEPATTERN; + this.emptyMatchTreatment = emptyMatchTreatment; this.fileOrPatternSpec = fileOrPatternSpec; } /** + * Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default + * value of {@link EmptyMatchTreatment#DISALLOW}. + */ + protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) { + this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize); + } + + /** * Create a {@code FileBasedSource} based on a single file. This constructor must be used when * creating a new {@code FileBasedSource} for a subrange of a single file. * Additionally, this constructor must be used to create new {@code FileBasedSource}s when @@ -110,6 +122,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { mode = Mode.SINGLE_FILE_OR_SUBRANGE; this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata"); this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString()); + + // This field will be unused in this mode. + this.emptyMatchTreatment = null; } /** @@ -204,14 +219,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { if (mode == Mode.FILEPATTERN) { long totalSize = 0; - List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern)); - MatchResult result = Iterables.getOnlyElement(inputs); - checkArgument( - result.status() == Status.OK, - "Error matching the pattern or glob %s: status %s", - fileOrPattern, - result.status()); - List<Metadata> allMatches = result.metadata(); + List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata(); for (Metadata metadata : allMatches) { totalSize += metadata.sizeBytes(); } @@ -254,9 +262,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern); - checkArgument(!expandedFiles.isEmpty(), - "Unable to find any files matching %s", fileOrPattern); + List<Metadata> expandedFiles = + FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata(); List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size()); for (Metadata metadata : expandedFiles) { FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes()); @@ -327,7 +334,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { if (mode == Mode.FILEPATTERN) { long startTime = System.currentTimeMillis(); - List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern); + List<Metadata> fileMetadata = + FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata(); + LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern); List<FileBasedReader<T>> fileReaders = new ArrayList<>(); for (Metadata metadata : fileMetadata) { long endOffset = metadata.sizeBytes(); @@ -389,13 +398,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { return metadata.sizeBytes(); } - private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException { - MatchResult matches = - Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec))); - LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPatternSpec); - return ImmutableList.copyOf(matches.metadata()); - } - /** * A {@link Source.Reader reader} that implements code common to readers of * {@code FileBasedSource}s. http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index bd4668f..96394b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; @@ -72,6 +73,8 @@ public class FileSystems { public static final String DEFAULT_SCHEME = "file"; private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*"); + private static final Pattern GLOB_PATTERN = + Pattern.compile("[*?{}]"); private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM = new AtomicReference<Map<String, FileSystem>>( @@ -79,6 +82,11 @@ public class FileSystems { /********************************** METHODS FOR CLIENT **********************************/ + /** Checks whether the given spec contains a glob wildcard character. */ + public static boolean hasGlobWildcard(String spec) { + return GLOB_PATTERN.matcher(spec).find(); + } + /** * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. * Callers should use {@link #match} to resolve users specs ambiguities before @@ -102,6 +110,9 @@ public class FileSystems { * <p>In case the spec schemes don't match any known {@link FileSystem} implementations, * FileSystems will attempt to use {@link LocalFileSystem} to resolve a path. * + * <p>Specs that do not match any resources are treated according to + * {@link EmptyMatchTreatment#DISALLOW}. + * * @return {@code List<MatchResult>} in the same order of the input specs. * * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes. @@ -114,6 +125,17 @@ public class FileSystems { return getFileSystemInternal(getOnlyScheme(specs)).match(specs); } + /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */ + public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment) + throws IOException { + List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs); + List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size()); + for (int i = 0; i < matches.size(); i++) { + res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment)); + } + return res; + } + /** * Like {@link #match(List)}, but for a single resource specification. @@ -130,6 +152,30 @@ public class FileSystems { matches); return matches.get(0); } + + /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */ + public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment) + throws IOException { + MatchResult res = match(spec); + return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment); + } + + private static MatchResult maybeAdjustEmptyMatchResult( + String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment) + throws IOException { + if (res.status() != Status.NOT_FOUND) { + return res; + } + boolean notFoundAllowed = + emptyMatchTreatment == EmptyMatchTreatment.ALLOW + || (FileSystems.hasGlobWildcard(spec) + && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD); + if (notFoundAllowed) { + return MatchResult.create(Status.OK, Collections.<Metadata>emptyList()); + } + return res; + } + /** * Returns the {@link Metadata} for a single file resource. Expects a resource specification * {@code spec} that matches a single result. http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 86c73d5..29188dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel; import java.util.NoSuchElementException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; @@ -48,7 +49,11 @@ import org.apache.beam.sdk.options.ValueProvider; @VisibleForTesting class TextSource extends FileBasedSource<String> { TextSource(ValueProvider<String> fileSpec) { - super(fileSpec, 1L); + this(fileSpec, EmptyMatchTreatment.DISALLOW); + } + + TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) { + super(fileSpec, emptyMatchTreatment, 1L); } private TextSource(MatchResult.Metadata metadata, long start, long end) { http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java new file mode 100644 index 0000000..8e12993 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.fs; + +import org.apache.beam.sdk.io.fs.MatchResult.Status; + +/** + * Options for allowing or disallowing filepatterns that match no resources in {@link + * org.apache.beam.sdk.io.FileSystems#match}. + */ +public enum EmptyMatchTreatment { + /** + * Filepatterns matching no resources are allowed. For such a filepattern, {@link + * MatchResult#status} will be {@link Status#OK} and {@link MatchResult#metadata} will return an + * empty list. + */ + ALLOW, + + /** + * Filepatterns matching no resources are disallowed. For such a filepattern, {@link + * MatchResult#status} will be {@link Status#NOT_FOUND} and {@link MatchResult#metadata} will + * throw a {@link java.io.FileNotFoundException}. + */ + DISALLOW, + + /** + * Filepatterns matching no resources are allowed if the filepattern contains a glob wildcard + * character, and disallowed otherwise (i.e. if the filepattern specifies a single file). + */ + ALLOW_IF_WILDCARD +} http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java index 642c049..aa80b96 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java @@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.Serializable; import java.util.List; +import org.apache.beam.sdk.io.FileSystems; /** * The result of {@link org.apache.beam.sdk.io.FileSystem#match}. @@ -78,7 +79,9 @@ public abstract class MatchResult { public abstract Status status(); /** - * {@link Metadata} of matched files. + * {@link Metadata} of matched files. Note that if {@link #status()} is {@link Status#NOT_FOUND}, + * this may either throw a {@link java.io.FileNotFoundException} or return an empty list, + * depending on the {@link EmptyMatchTreatment} used in the {@link FileSystems#match} call. */ public abstract List<Metadata> metadata() throws IOException; http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index 1bdb915..ea9e06b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -94,6 +95,15 @@ public class FileBasedSourceTest { } public TestFileBasedSource( + String fileOrPattern, + EmptyMatchTreatment emptyMatchTreatment, + long minBundleSize, + String splitHeader) { + super(StaticValueProvider.of(fileOrPattern), emptyMatchTreatment, minBundleSize); + this.splitHeader = splitHeader; + } + + public TestFileBasedSource( Metadata fileOrPattern, long minBundleSize, long startOffset, @@ -371,6 +381,47 @@ public class FileBasedSourceTest { } @Test + public void testEmptyFilepatternTreatmentDefaultDisallow() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + TestFileBasedSource source = + new TestFileBasedSource(new File(tempFolder.getRoot(), "doesNotExist").getPath(), 64, null); + thrown.expect(FileNotFoundException.class); + readFromSource(source, options); + } + + @Test + public void testEmptyFilepatternTreatmentAllow() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + TestFileBasedSource source = + new TestFileBasedSource( + new File(tempFolder.getRoot(), "doesNotExist").getPath(), + EmptyMatchTreatment.ALLOW, + 64, + null); + TestFileBasedSource sourceWithWildcard = + new TestFileBasedSource( + new File(tempFolder.getRoot(), "doesNotExist*").getPath(), + EmptyMatchTreatment.ALLOW_IF_WILDCARD, + 64, + null); + assertEquals(0, readFromSource(source, options).size()); + assertEquals(0, readFromSource(sourceWithWildcard, options).size()); + } + + @Test + public void testEmptyFilepatternTreatmentAllowIfWildcard() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + TestFileBasedSource source = + new TestFileBasedSource( + new File(tempFolder.getRoot(), "doesNotExist").getPath(), + EmptyMatchTreatment.ALLOW_IF_WILDCARD, + 64, + null); + thrown.expect(FileNotFoundException.class); + readFromSource(source, options); + } + + @Test public void testCloseUnstartedFilePatternReader() throws IOException { PipelineOptions options = PipelineOptionsFactory.create(); List<String> data1 = createStringDataset(3, 50);
