Repository: beam Updated Branches: refs/heads/master 8d337ff0e -> df36bd9d7
[BEAM-2512] Introduces TextIO.watchForNewFiles() and Match Part of http://s.apache.org/textio-sdf, based on http://s.apache.org/beam-watch-transform. The Match transform can be useful for users who want to write their own file-based connectors, or for advanced use cases such as: watch for new subdirectories to appear in a directory (using Match), and then start watching each subdirectory for new files and reading them (using TextIO.watchForNewFiles()). Additionally, finally makes it configurable whether TextIO.read/readAll() allow filepatterns matching no files. Normal reads disallow empty filepatterns (to preserve old behavior), readAll() allows them if the filepattern contains a wildcard (which seems a reasonable default behavior that read() should have had from the beginning, but we can't change it), and watchForNewFiles() allows them unconditionally (because files might appear later). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe002c22 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe002c22 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe002c22 Branch: refs/heads/master Commit: fe002c221602a543b99afd6db910a7a60b259fa4 Parents: db9aede Author: Eugene Kirpichov <[email protected]> Authored: Thu Aug 3 14:44:35 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Aug 4 16:38:23 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/annotations/Experimental.java | 5 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 2 + .../main/java/org/apache/beam/sdk/io/Match.java | 156 +++++++++++++++++++ .../beam/sdk/io/ReadAllViaFileBasedSource.java | 46 +++--- .../java/org/apache/beam/sdk/io/TextIO.java | 156 ++++++++++++++++--- .../org/apache/beam/sdk/transforms/DoFn.java | 11 +- .../org/apache/beam/sdk/transforms/Watch.java | 16 +- .../org/apache/beam/sdk/io/TextIOReadTest.java | 54 ++++++- 8 files changed, 384 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 8224ebb..80c4613 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -72,8 +72,9 @@ public @interface Experimental { OUTPUT_TIME, /** - * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. - * Do not use: API is unstable and runner support is incomplete. + * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. See <a + * href="https://beam.apache.org/documentation/runners/capability-matrix/">capability matrix</a> + * for runner support. */ SPLITTABLE_DO_FN, http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index cd5857c..653b806 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -478,6 +478,7 @@ public class AvroIO { public PCollection<T> expand(PCollection<String> input) { checkNotNull(getSchema(), "schema"); return input + .apply(Match.filepatterns()) .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( @@ -632,6 +633,7 @@ public class AvroIO { } }; return input + .apply(Match.filepatterns()) .apply( "Parse all via FileBasedSource", new ReadAllViaFileBasedSource<>( http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java new file mode 100644 index 0000000..bb44fac --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.Watch.Growth.PollResult; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Matches each filepattern in a collection of filepatterns using {@link FileSystems#match}, and + * produces a collection of matched resources (both files and directories) as {@link Metadata}. + * Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple + * filepatterns, it will be produced multiple times. + * + * <p>By default, this transform matches each filepattern once and produces a bounded {@link + * PCollection}. To continuously watch each filepattern for new matches, use {@link + * Filepatterns#continuously(Duration, TerminationCondition)} - this will produce an unbounded + * {@link PCollection}. + * + * <p>By default, filepatterns matching no resources are treated according to {@link + * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link + * Filepatterns#withEmptyMatchTreatment}. + */ +public class Match { + private static final Logger LOG = LoggerFactory.getLogger(Match.class); + + /** See {@link Match}. */ + public static Filepatterns filepatterns() { + return new AutoValue_Match_Filepatterns.Builder() + .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) + .build(); + } + + /** Implementation of {@link #filepatterns}. */ + @AutoValue + public abstract static class Filepatterns + extends PTransform<PCollection<String>, PCollection<Metadata>> { + abstract EmptyMatchTreatment getEmptyMatchTreatment(); + + @Nullable + abstract Duration getWatchInterval(); + + @Nullable + abstract TerminationCondition<String, ?> getWatchTerminationCondition(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); + + abstract Builder setWatchInterval(Duration watchInterval); + + abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition); + + abstract Filepatterns build(); + } + + /** + * Sets whether or not filepatterns matching no files are allowed. When using {@link + * #continuously}, they are always allowed, and this parameter is ignored. + */ + public Filepatterns withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return toBuilder().setEmptyMatchTreatment(treatment).build(); + } + + /** + * Continuously watches for new resources matching the filepattern, repeatedly matching it at + * the given interval, until the given termination condition is reached. The returned {@link + * PCollection} is unbounded. + * + * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}. + * + * @see TerminationCondition + */ + @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) + public Filepatterns continuously( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return toBuilder() + .setWatchInterval(pollInterval) + .setWatchTerminationCondition(terminationCondition) + .build(); + } + + @Override + public PCollection<Metadata> expand(PCollection<String> input) { + if (getWatchInterval() == null) { + return input.apply("Match filepatterns", ParDo.of(new MatchFn(getEmptyMatchTreatment()))); + } else { + return input + .apply( + "Continuously match filepatterns", + Watch.growthOf(new MatchPollFn()) + .withPollInterval(getWatchInterval()) + .withTerminationPerInput(getWatchTerminationCondition())) + .apply(Values.<Metadata>create()); + } + } + + private static class MatchFn extends DoFn<String, Metadata> { + private final EmptyMatchTreatment emptyMatchTreatment; + + public MatchFn(EmptyMatchTreatment emptyMatchTreatment) { + this.emptyMatchTreatment = emptyMatchTreatment; + } + + @ProcessElement + public void process(ProcessContext c) throws Exception { + String filepattern = c.element(); + MatchResult match = FileSystems.match(filepattern, emptyMatchTreatment); + LOG.info("Matched {} files for pattern {}", match.metadata().size(), filepattern); + for (Metadata metadata : match.metadata()) { + c.output(metadata); + } + } + } + + private static class MatchPollFn implements Watch.Growth.PollFn<String, Metadata> { + @Override + public PollResult<Metadata> apply(String input, Instant timestamp) throws Exception { + return PollResult.incomplete( + Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java index 66aa41e..990f508 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java @@ -21,7 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -33,10 +34,14 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** - * Reads each filepattern in the input {@link PCollection} using given parameters for splitting - * files into offset ranges and for creating a {@link FileBasedSource} for a file. + * Reads each file in the input {@link PCollection} of {@link Metadata} using given parameters for + * splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The + * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}. + * + * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link + * Match#filepatterns()}. */ -class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PCollection<T>> { +class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCollection<T>> { private final SerializableFunction<String, Boolean> isSplittable; private final long desiredBundleSizeBytes; private final SerializableFunction<String, FileBasedSource<T>> createSource; @@ -51,13 +56,12 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl } @Override - public PCollection<T> expand(PCollection<String> input) { + public PCollection<T> expand(PCollection<Metadata> input) { return input - .apply("Expand glob", ParDo.of(new ExpandGlobFn())) .apply( "Split into ranges", ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes))) - .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>()) + .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>()) .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource))); } @@ -86,23 +90,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl } } - private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> { - @ProcessElement - public void process(ProcessContext c) throws Exception { - MatchResult match = FileSystems.match(c.element()); - checkArgument( - match.status().equals(MatchResult.Status.OK), - "Failed to match filepattern %s: %s", - c.element(), - match.status()); - for (MatchResult.Metadata metadata : match.metadata()) { - c.output(metadata); - } - } - } - - private static class SplitIntoRangesFn - extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>> { + private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> { private final SerializableFunction<String, Boolean> isSplittable; private final long desiredBundleSizeBytes; @@ -114,7 +102,11 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl @ProcessElement public void process(ProcessContext c) { - MatchResult.Metadata metadata = c.element(); + Metadata metadata = c.element(); + checkArgument( + !metadata.resourceId().isDirectory(), + "Resource %s is a directory", + metadata.resourceId()); if (!metadata.isReadSeekEfficient() || !isSplittable.apply(metadata.resourceId().toString())) { c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes()))); @@ -127,7 +119,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl } } - private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata, OffsetRange>, T> { + private static class ReadFileRangesFn<T> extends DoFn<KV<Metadata, OffsetRange>, T> { private final SerializableFunction<String, FileBasedSource<T>> createSource; private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) { @@ -136,7 +128,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl @ProcessElement public void process(ProcessContext c) throws IOException { - MatchResult.Metadata metadata = c.element().getKey(); + Metadata metadata = c.element().getKey(); OffsetRange range = c.element().getValue(); FileBasedSource<T> source = createSource.apply(metadata.toString()); try (BoundedSource.BoundedReader<T> reader = http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 9a14ad9..612f5c5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params; import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations; import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -44,10 +45,12 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; /** * {@link PTransform}s for reading and writing text files. @@ -57,9 +60,16 @@ import org.apache.beam.sdk.values.PDone; * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link * PCollection}, apply {@link TextIO#readAll()}. * - * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each - * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', - * or '\r\n'). + * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to + * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n'). + * + * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link + * ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s). + * + * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll} + * allows them in case the filepattern contains a glob wildcard character. Use {@link + * TextIO.Read#withEmptyMatchTreatment} and {@link TextIO.ReadAll#withEmptyMatchTreatment} to + * configure this behavior. * * <p>Example 1: reading a file or filepattern. * @@ -88,6 +98,20 @@ import org.apache.beam.sdk.values.PDone; * PCollection<String> lines = filenames.apply(TextIO.readAll()); * }</pre> * + * <p>Example 3: streaming new files matching a filepattern. + * + * <pre>{@code + * Pipeline p = ...; + * + * PCollection<String> lines = p.apply(TextIO.read() + * .from("/local/path/to/files/*") + * .watchForNewFiles( + * // Check for new files every minute + * Duration.standardMinutes(1), + * // Stop watching the filepattern if no new files appear within an hour + * afterTimeSinceNewOutput(Duration.standardHours(1)))); + * }</pre> + * * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write. * @@ -153,6 +177,7 @@ public class TextIO { return new AutoValue_TextIO_Read.Builder() .setCompressionType(CompressionType.AUTO) .setHintMatchesManyFiles(false) + .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW) .build(); } @@ -173,6 +198,7 @@ public class TextIO { // but is not so large as to exhaust a typical runner's maximum amount of output per // ProcessElement call. .setDesiredBundleSizeBytes(64 * 1024 * 1024L) + .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD) .build(); } @@ -219,7 +245,15 @@ public class TextIO { public abstract static class Read extends PTransform<PBegin, PCollection<String>> { @Nullable abstract ValueProvider<String> getFilepattern(); abstract CompressionType getCompressionType(); + + @Nullable + abstract Duration getWatchForNewFilesInterval(); + + @Nullable + abstract TerminationCondition getWatchForNewFilesTerminationCondition(); + abstract boolean getHintMatchesManyFiles(); + abstract EmptyMatchTreatment getEmptyMatchTreatment(); abstract Builder toBuilder(); @@ -227,7 +261,10 @@ public class TextIO { abstract static class Builder { abstract Builder setFilepattern(ValueProvider<String> filepattern); abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); + abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition); abstract Builder setHintMatchesManyFiles(boolean hintManyFiles); + abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); abstract Read build(); } @@ -257,8 +294,7 @@ public class TextIO { } /** - * Returns a new transform for reading from text files that's like this one but - * reads from input sources using the specified compression type. + * Reads from input sources using the specified compression type. * * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}. */ @@ -267,6 +303,23 @@ public class TextIO { } /** + * Continuously watches for new files matching the filepattern, polling it at the given + * interval, until the given termination condition is reached. The returned {@link PCollection} + * is unbounded. + * + * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}. + * + * @see TerminationCondition + */ + @Experimental(Kind.SPLITTABLE_DO_FN) + public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) { + return toBuilder() + .setWatchForNewFilesInterval(pollInterval) + .setWatchForNewFilesTerminationCondition(terminationCondition) + .build(); + } + + /** * Hints that the filepattern specified in {@link #from(String)} matches a very large number of * files. * @@ -279,20 +332,40 @@ public class TextIO { return toBuilder().setHintMatchesManyFiles(true).build(); } + /** + * Configures whether or not a filepattern matching no files is allowed. When using {@link + * #watchForNewFiles}, it is always allowed and this parameter is ignored. + */ + public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return toBuilder().setEmptyMatchTreatment(treatment).build(); + } + @Override public PCollection<String> expand(PBegin input) { checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform"); - return getHintMatchesManyFiles() - ? input - .apply( - "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) - .apply(readAll().withCompressionType(getCompressionType())) - : input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource())); + if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) { + return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource())); + } + // All other cases go through ReadAll. + ReadAll readAll = + readAll() + .withCompressionType(getCompressionType()) + .withEmptyMatchTreatment(getEmptyMatchTreatment()); + if (getWatchForNewFilesInterval() != null) { + readAll = + readAll.watchForNewFiles( + getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition()); + } + return input + .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of())) + .apply("Via ReadAll", readAll); } // Helper to create a source specific to the requested compression type. protected FileBasedSource<String> getSource() { - return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType()); + return wrapWithCompression( + new TextSource(getFilepattern(), getEmptyMatchTreatment()), + getCompressionType()); } private static FileBasedSource<String> wrapWithCompression( @@ -330,10 +403,17 @@ public class TextIO { String filepatternDisplay = getFilepattern().isAccessible() ? getFilepattern().get() : getFilepattern().toString(); builder - .add(DisplayData.item("compressionType", getCompressionType().toString()) - .withLabel("Compression Type")) - .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) - .withLabel("File Pattern")); + .add( + DisplayData.item("compressionType", getCompressionType().toString()) + .withLabel("Compression Type")) + .addIfNotNull( + DisplayData.item("filePattern", filepatternDisplay).withLabel("File Pattern")) + .add( + DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) + .withLabel("Treatment of filepatterns that match no files")) + .addIfNotNull( + DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval()) + .withLabel("Interval to watch for new files")); } } @@ -344,6 +424,14 @@ public class TextIO { public abstract static class ReadAll extends PTransform<PCollection<String>, PCollection<String>> { abstract CompressionType getCompressionType(); + + @Nullable + abstract Duration getWatchForNewFilesInterval(); + + @Nullable + abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition(); + + abstract EmptyMatchTreatment getEmptyMatchTreatment(); abstract long getDesiredBundleSizeBytes(); abstract Builder toBuilder(); @@ -351,6 +439,10 @@ public class TextIO { @AutoValue.Builder abstract static class Builder { abstract Builder setCompressionType(CompressionType compressionType); + abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval); + abstract Builder setWatchForNewFilesTerminationCondition( + TerminationCondition<String, ?> condition); + abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment); abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes); abstract ReadAll build(); @@ -361,6 +453,21 @@ public class TextIO { return toBuilder().setCompressionType(compressionType).build(); } + /** Same as {@link Read#withEmptyMatchTreatment}. */ + public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return toBuilder().setEmptyMatchTreatment(treatment).build(); + } + + /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */ + @Experimental(Kind.SPLITTABLE_DO_FN) + public ReadAll watchForNewFiles( + Duration pollInterval, TerminationCondition<String, ?> terminationCondition) { + return toBuilder() + .setWatchForNewFilesInterval(pollInterval) + .setWatchForNewFilesTerminationCondition(terminationCondition) + .build(); + } + @VisibleForTesting ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) { return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build(); @@ -368,13 +475,21 @@ public class TextIO { @Override public PCollection<String> expand(PCollection<String> input) { + Match.Filepatterns matchFilepatterns = + Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment()); + if (getWatchForNewFilesInterval() != null) { + matchFilepatterns = + matchFilepatterns.continuously( + getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition()); + } return input + .apply(matchFilepatterns) .apply( "Read all via FileBasedSource", new ReadAllViaFileBasedSource<>( new IsSplittableFn(getCompressionType()), getDesiredBundleSizeBytes(), - new CreateTextSourceFn(getCompressionType()))) + new CreateTextSourceFn(getCompressionType(), getEmptyMatchTreatment()))) .setCoder(StringUtf8Coder.of()); } @@ -390,15 +505,18 @@ public class TextIO { private static class CreateTextSourceFn implements SerializableFunction<String, FileBasedSource<String>> { private final CompressionType compressionType; + private final EmptyMatchTreatment emptyMatchTreatment; - private CreateTextSourceFn(CompressionType compressionType) { + private CreateTextSourceFn( + CompressionType compressionType, EmptyMatchTreatment emptyMatchTreatment) { this.compressionType = compressionType; + this.emptyMatchTreatment = emptyMatchTreatment; } @Override public FileBasedSource<String> apply(String input) { return Read.wrapWithCompression( - new TextSource(StaticValueProvider.of(input)), compressionType); + new TextSource(StaticValueProvider.of(input), emptyMatchTreatment), compressionType); } } http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 37c6263..3e023db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -524,12 +524,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <li>It must return {@code void}. * </ul> * - * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2> + * <h2>Splittable DoFn's</h2> * * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter * whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an - * overwhelming majority of users will never need to write a splittable {@link DoFn}. Right now - * the implementation of this feature is in progress and it's not ready for any use. + * overwhelming majority of users will never need to write a splittable {@link DoFn}. + * + * <p>Not all runners support Splittable DoFn. See the + * <a href="https://beam.apache.org/documentation/runners/capability-matrix/">capability + * matrix</a>. * * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>). @@ -558,8 +561,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * </ul> * * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods. - * - * <p>More documentation will be added when the feature becomes ready for general usage. */ @Documented @Retention(RetentionPolicy.RUNTIME) http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java index fc6f18d..9da2408 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java @@ -38,7 +38,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; -import java.lang.reflect.TypeVariable; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; @@ -64,6 +63,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.ReadableDuration; @@ -554,14 +555,13 @@ public class Watch { if (outputCoder == null) { // If a coder was not specified explicitly, infer it from the OutputT type parameter // of the PollFn. - TypeDescriptor<?> superDescriptor = - TypeDescriptor.of(getPollFn().getClass()).getSupertype(PollFn.class); - TypeVariable typeVariable = superDescriptor.getTypeParameter("OutputT"); - @SuppressWarnings("unchecked") - TypeDescriptor<OutputT> descriptor = - (TypeDescriptor<OutputT>) superDescriptor.resolveType(typeVariable); + TypeDescriptor<OutputT> outputT = + TypeDescriptors.extractFromTypeParameters( + getPollFn(), + PollFn.class, + new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() {}); try { - outputCoder = input.getPipeline().getCoderRegistry().getCoder(descriptor); + outputCoder = input.getPipeline().getCoderRegistry().getCoder(outputT); } catch (CannotProvideCoderException e) { throw new RuntimeException( "Unable to infer coder for OutputT. Specify it explicitly using withOutputCoder()."); http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java index 8ad6030..aa6090d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java @@ -25,6 +25,7 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE; import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP; import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED; import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP; +import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -63,6 +64,7 @@ import java.util.zip.ZipOutputStream; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.TextIO.CompressionType; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; @@ -70,6 +72,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSplittableParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -78,6 +81,7 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; +import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -787,7 +791,8 @@ public class TextIOReadTest { private TextSource prepareSource(byte[] data) throws IOException { Path path = Files.createTempFile(tempFolder, "tempfile", "ext"); Files.write(path, data); - return new TextSource(ValueProvider.StaticValueProvider.of(path.toString())); + return new TextSource( + ValueProvider.StaticValueProvider.of(path.toString()), EmptyMatchTreatment.DISALLOW); } @Test @@ -872,4 +877,51 @@ public class TextIOReadTest { PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE)); p.run(); } + + @Test + @Category({NeedsRunner.class, UsesSplittableParDo.class}) + public void testReadWatchForNewFiles() throws IOException, InterruptedException { + final Path basePath = tempFolder.resolve("readWatch"); + basePath.toFile().mkdir(); + PCollection<String> lines = + p.apply( + TextIO.read() + .from(basePath.resolve("*").toString()) + // Make sure that compression type propagates into readAll() + .withCompressionType(ZIP) + .watchForNewFiles( + Duration.millis(100), afterTimeSinceNewOutput(Duration.standardSeconds(3)))); + + Thread writer = + new Thread() { + @Override + public void run() { + try { + Thread.sleep(1000); + writeToFile( + Arrays.asList("a.1", "a.2"), + basePath.resolve("fileA").toString(), + CompressionType.ZIP); + Thread.sleep(300); + writeToFile( + Arrays.asList("b.1", "b.2"), + basePath.resolve("fileB").toString(), + CompressionType.ZIP); + Thread.sleep(300); + writeToFile( + Arrays.asList("c.1", "c.2"), + basePath.resolve("fileC").toString(), + CompressionType.ZIP); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + writer.start(); + + PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2"); + p.run(); + + writer.join(); + } }
