Repository: beam
Updated Branches:
  refs/heads/master 8d337ff0e -> df36bd9d7


[BEAM-2512] Introduces TextIO.watchForNewFiles() and Match

Part of http://s.apache.org/textio-sdf, based on
http://s.apache.org/beam-watch-transform.

The Match transform can be useful for users who want to write their own
file-based connectors, or for advanced use cases such as: watch for new
subdirectories to appear in a directory (using Match), and then start
watching each subdirectory for new files and reading them
(using TextIO.watchForNewFiles()).

Additionally, finally makes it configurable whether TextIO.read/readAll()
allow filepatterns matching no files.

Normal reads disallow empty filepatterns (to preserve old behavior), readAll()
allows them if the filepattern contains a wildcard (which seems a reasonable
default behavior that read() should have had from the beginning, but we can't
change it), and watchForNewFiles() allows them unconditionally (because files
might appear later).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe002c22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe002c22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe002c22

Branch: refs/heads/master
Commit: fe002c221602a543b99afd6db910a7a60b259fa4
Parents: db9aede
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Aug 3 14:44:35 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Aug 4 16:38:23 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/annotations/Experimental.java      |   5 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +
 .../main/java/org/apache/beam/sdk/io/Match.java | 156 +++++++++++++++++++
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  |  46 +++---
 .../java/org/apache/beam/sdk/io/TextIO.java     | 156 ++++++++++++++++---
 .../org/apache/beam/sdk/transforms/DoFn.java    |  11 +-
 .../org/apache/beam/sdk/transforms/Watch.java   |  16 +-
 .../org/apache/beam/sdk/io/TextIOReadTest.java  |  54 ++++++-
 8 files changed, 384 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 8224ebb..80c4613 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -72,8 +72,9 @@ public @interface Experimental {
     OUTPUT_TIME,
 
     /**
-     * <a href="https://s.apache.org/splittable-do-fn";>Splittable DoFn</a>.
-     * Do not use: API is unstable and runner support is incomplete.
+     * <a href="https://s.apache.org/splittable-do-fn";>Splittable DoFn</a>. 
See <a
+     * 
href="https://beam.apache.org/documentation/runners/capability-matrix/";>capability
 matrix</a>
+     * for runner support.
      */
     SPLITTABLE_DO_FN,
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index cd5857c..653b806 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -478,6 +478,7 @@ public class AvroIO {
     public PCollection<T> expand(PCollection<String> input) {
       checkNotNull(getSchema(), "schema");
       return input
+          .apply(Match.filepatterns())
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
@@ -632,6 +633,7 @@ public class AvroIO {
             }
           };
       return input
+          .apply(Match.filepatterns())
           .apply(
               "Parse all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
new file mode 100644
index 0000000..bb44fac
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Matches each filepattern in a collection of filepatterns using {@link 
FileSystems#match}, and
+ * produces a collection of matched resources (both files and directories) as 
{@link Metadata}.
+ * Resources are not deduplicated between filepatterns, i.e. if the same 
resource matches multiple
+ * filepatterns, it will be produced multiple times.
+ *
+ * <p>By default, this transform matches each filepattern once and produces a 
bounded {@link
+ * PCollection}. To continuously watch each filepattern for new matches, use 
{@link
+ * Filepatterns#continuously(Duration, TerminationCondition)} - this will 
produce an unbounded
+ * {@link PCollection}.
+ *
+ * <p>By default, filepatterns matching no resources are treated according to 
{@link
+ * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use 
{@link
+ * Filepatterns#withEmptyMatchTreatment}.
+ */
+public class Match {
+  private static final Logger LOG = LoggerFactory.getLogger(Match.class);
+
+  /** See {@link Match}. */
+  public static Filepatterns filepatterns() {
+    return new AutoValue_Match_Filepatterns.Builder()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+        .build();
+  }
+
+  /** Implementation of {@link #filepatterns}. */
+  @AutoValue
+  public abstract static class Filepatterns
+      extends PTransform<PCollection<String>, PCollection<Metadata>> {
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    @Nullable
+    abstract Duration getWatchInterval();
+
+    @Nullable
+    abstract TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder 
setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Filepatterns build();
+    }
+
+    /**
+     * Sets whether or not filepatterns matching no files are allowed. When 
using {@link
+     * #continuously}, they are always allowed, and this parameter is ignored.
+     */
+    public Filepatterns withEmptyMatchTreatment(EmptyMatchTreatment treatment) 
{
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /**
+     * Continuously watches for new resources matching the filepattern, 
repeatedly matching it at
+     * the given interval, until the given termination condition is reached. 
The returned {@link
+     * PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting {@link 
Experimental.Kind#SPLITTABLE_DO_FN}.
+     *
+     * @see TerminationCondition
+     */
+    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+    public Filepatterns continuously(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return toBuilder()
+          .setWatchInterval(pollInterval)
+          .setWatchTerminationCondition(terminationCondition)
+          .build();
+    }
+
+    @Override
+    public PCollection<Metadata> expand(PCollection<String> input) {
+      if (getWatchInterval() == null) {
+        return input.apply("Match filepatterns", ParDo.of(new 
MatchFn(getEmptyMatchTreatment())));
+      } else {
+        return input
+            .apply(
+                "Continuously match filepatterns",
+                Watch.growthOf(new MatchPollFn())
+                    .withPollInterval(getWatchInterval())
+                    .withTerminationPerInput(getWatchTerminationCondition()))
+            .apply(Values.<Metadata>create());
+      }
+    }
+
+    private static class MatchFn extends DoFn<String, Metadata> {
+      private final EmptyMatchTreatment emptyMatchTreatment;
+
+      public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
+        this.emptyMatchTreatment = emptyMatchTreatment;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception {
+        String filepattern = c.element();
+        MatchResult match = FileSystems.match(filepattern, 
emptyMatchTreatment);
+        LOG.info("Matched {} files for pattern {}", match.metadata().size(), 
filepattern);
+        for (Metadata metadata : match.metadata()) {
+          c.output(metadata);
+        }
+      }
+    }
+
+    private static class MatchPollFn implements Watch.Growth.PollFn<String, 
Metadata> {
+      @Override
+      public PollResult<Metadata> apply(String input, Instant timestamp) 
throws Exception {
+        return PollResult.incomplete(
+            Instant.now(), FileSystems.match(input, 
EmptyMatchTreatment.ALLOW).metadata());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 66aa41e..990f508 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -21,7 +21,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -33,10 +34,14 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Reads each filepattern in the input {@link PCollection} using given 
parameters for splitting
- * files into offset ranges and for creating a {@link FileBasedSource} for a 
file.
+ * Reads each file in the input {@link PCollection} of {@link Metadata} using 
given parameters for
+ * splitting files into offset ranges and for creating a {@link 
FileBasedSource} for a file. The
+ * input {@link PCollection} must not contain {@link ResourceId#isDirectory 
directories}.
+ *
+ * <p>To obtain the collection of {@link Metadata} from a filepattern, use 
{@link
+ * Match#filepatterns()}.
  */
-class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, 
PCollection<T>> {
+class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, 
PCollection<T>> {
   private final SerializableFunction<String, Boolean> isSplittable;
   private final long desiredBundleSizeBytes;
   private final SerializableFunction<String, FileBasedSource<T>> createSource;
@@ -51,13 +56,12 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<String>, PColl
   }
 
   @Override
-  public PCollection<T> expand(PCollection<String> input) {
+  public PCollection<T> expand(PCollection<Metadata> input) {
     return input
-        .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
         .apply(
             "Split into ranges",
             ParDo.of(new SplitIntoRangesFn(isSplittable, 
desiredBundleSizeBytes)))
-        .apply("Reshuffle", new 
ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>())
+        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, 
OffsetRange>>())
         .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
   }
 
@@ -86,23 +90,7 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<String>, PColl
     }
   }
 
-  private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> 
{
-    @ProcessElement
-    public void process(ProcessContext c) throws Exception {
-      MatchResult match = FileSystems.match(c.element());
-      checkArgument(
-          match.status().equals(MatchResult.Status.OK),
-          "Failed to match filepattern %s: %s",
-          c.element(),
-          match.status());
-      for (MatchResult.Metadata metadata : match.metadata()) {
-        c.output(metadata);
-      }
-    }
-  }
-
-  private static class SplitIntoRangesFn
-      extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, 
OffsetRange>> {
+  private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, 
OffsetRange>> {
     private final SerializableFunction<String, Boolean> isSplittable;
     private final long desiredBundleSizeBytes;
 
@@ -114,7 +102,11 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<String>, PColl
 
     @ProcessElement
     public void process(ProcessContext c) {
-      MatchResult.Metadata metadata = c.element();
+      Metadata metadata = c.element();
+      checkArgument(
+          !metadata.resourceId().isDirectory(),
+          "Resource %s is a directory",
+          metadata.resourceId());
       if (!metadata.isReadSeekEfficient()
           || !isSplittable.apply(metadata.resourceId().toString())) {
         c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
@@ -127,7 +119,7 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<String>, PColl
     }
   }
 
-  private static class ReadFileRangesFn<T> extends 
DoFn<KV<MatchResult.Metadata, OffsetRange>, T> {
+  private static class ReadFileRangesFn<T> extends DoFn<KV<Metadata, 
OffsetRange>, T> {
     private final SerializableFunction<String, FileBasedSource<T>> 
createSource;
 
     private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> 
createSource) {
@@ -136,7 +128,7 @@ class ReadAllViaFileBasedSource<T> extends 
PTransform<PCollection<String>, PColl
 
     @ProcessElement
     public void process(ProcessContext c) throws IOException {
-      MatchResult.Metadata metadata = c.element().getKey();
+      Metadata metadata = c.element().getKey();
       OffsetRange range = c.element().getValue();
       FileBasedSource<T> source = createSource.apply(metadata.toString());
       try (BoundedSource.BoundedReader<T> reader =

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9a14ad9..612f5c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -44,10 +45,12 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
 
 /**
  * {@link PTransform}s for reading and writing text files.
@@ -57,9 +60,16 @@ import org.apache.beam.sdk.values.PDone;
  * file(s) to be read. Alternatively, if the filenames to be read are 
themselves in a {@link
  * PCollection}, apply {@link TextIO#readAll()}.
  *
- * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String 
Strings}, each
- * corresponding to one line of an input UTF-8 text file (split into lines 
delimited by '\n', '\r',
- * or '\r\n').
+ * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, 
each corresponding to
+ * one line of an input UTF-8 text file (split into lines delimited by '\n', 
'\r', or '\r\n').
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link 
Read#watchForNewFiles} and {@link
+ * ReadAll#watchForNewFiles} allow streaming of new files matching the 
filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, 
and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use 
{@link
+ * TextIO.Read#withEmptyMatchTreatment} and {@link 
TextIO.ReadAll#withEmptyMatchTreatment} to
+ * configure this behavior.
  *
  * <p>Example 1: reading a file or filepattern.
  *
@@ -88,6 +98,20 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<String> lines = filenames.apply(TextIO.readAll());
  * }</pre>
  *
+ * <p>Example 3: streaming new files matching a filepattern.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> lines = p.apply(TextIO.read()
+ *     .from("/local/path/to/files/*")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
  * <p>To write a {@link PCollection} to one or more text files, use {@code 
TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files 
to write.
  *
@@ -153,6 +177,7 @@ public class TextIO {
     return new AutoValue_TextIO_Read.Builder()
         .setCompressionType(CompressionType.AUTO)
         .setHintMatchesManyFiles(false)
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .build();
   }
 
@@ -173,6 +198,7 @@ public class TextIO {
         // but is not so large as to exhaust a typical runner's maximum amount 
of output per
         // ProcessElement call.
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .build();
   }
 
@@ -219,7 +245,15 @@ public class TextIO {
   public abstract static class Read extends PTransform<PBegin, 
PCollection<String>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract CompressionType getCompressionType();
+
+    @Nullable
+    abstract Duration getWatchForNewFilesInterval();
+
+    @Nullable
+    abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+
     abstract boolean getHintMatchesManyFiles();
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
 
     abstract Builder toBuilder();
 
@@ -227,7 +261,10 @@ public class TextIO {
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
       abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
+      abstract Builder 
setWatchForNewFilesTerminationCondition(TerminationCondition condition);
       abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
 
       abstract Read build();
     }
@@ -257,8 +294,7 @@ public class TextIO {
     }
 
     /**
-     * Returns a new transform for reading from text files that's like this 
one but
-     * reads from input sources using the specified compression type.
+     * Reads from input sources using the specified compression type.
      *
      * <p>If no compression type is specified, the default is {@link 
TextIO.CompressionType#AUTO}.
      */
@@ -267,6 +303,23 @@ public class TextIO {
     }
 
     /**
+     * Continuously watches for new files matching the filepattern, polling it 
at the given
+     * interval, until the given termination condition is reached. The 
returned {@link PCollection}
+     * is unbounded.
+     *
+     * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
+     *
+     * @see TerminationCondition
+     */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public Read watchForNewFiles(Duration pollInterval, TerminationCondition 
terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
+    /**
      * Hints that the filepattern specified in {@link #from(String)} matches a 
very large number of
      * files.
      *
@@ -279,20 +332,40 @@ public class TextIO {
       return toBuilder().setHintMatchesManyFiles(true).build();
     }
 
+    /**
+     * Configures whether or not a filepattern matching no files is allowed. 
When using {@link
+     * #watchForNewFiles}, it is always allowed and this parameter is ignored.
+     */
+    public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     @Override
     public PCollection<String> expand(PBegin input) {
       checkNotNull(getFilepattern(), "need to set the filepattern of a 
TextIO.Read transform");
-      return getHintMatchesManyFiles()
-          ? input
-              .apply(
-                  "Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
-              .apply(readAll().withCompressionType(getCompressionType()))
-          : input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) 
{
+        return input.apply("Read", 
org.apache.beam.sdk.io.Read.from(getSource()));
+      }
+      // All other cases go through ReadAll.
+      ReadAll readAll =
+          readAll()
+              .withCompressionType(getCompressionType())
+              .withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        readAll =
+            readAll.watchForNewFiles(
+                getWatchForNewFilesInterval(), 
getWatchForNewFilesTerminationCondition());
+      }
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), 
StringUtf8Coder.of()))
+          .apply("Via ReadAll", readAll);
     }
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      return wrapWithCompression(new TextSource(getFilepattern()), 
getCompressionType());
+      return wrapWithCompression(
+          new TextSource(getFilepattern(), getEmptyMatchTreatment()),
+          getCompressionType());
     }
 
     private static FileBasedSource<String> wrapWithCompression(
@@ -330,10 +403,17 @@ public class TextIO {
       String filepatternDisplay = getFilepattern().isAccessible()
         ? getFilepattern().get() : getFilepattern().toString();
       builder
-          .add(DisplayData.item("compressionType", 
getCompressionType().toString())
-            .withLabel("Compression Type"))
-          .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
-            .withLabel("File Pattern"));
+          .add(
+              DisplayData.item("compressionType", 
getCompressionType().toString())
+                  .withLabel("Compression Type"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", 
filepatternDisplay).withLabel("File Pattern"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", 
getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", 
getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 
@@ -344,6 +424,14 @@ public class TextIO {
   public abstract static class ReadAll
       extends PTransform<PCollection<String>, PCollection<String>> {
     abstract CompressionType getCompressionType();
+
+    @Nullable
+    abstract Duration getWatchForNewFilesInterval();
+
+    @Nullable
+    abstract TerminationCondition<String, ?> 
getWatchForNewFilesTerminationCondition();
+
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract long getDesiredBundleSizeBytes();
 
     abstract Builder toBuilder();
@@ -351,6 +439,10 @@ public class TextIO {
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setWatchForNewFilesInterval(Duration 
watchForNewFilesInterval);
+      abstract Builder setWatchForNewFilesTerminationCondition(
+          TerminationCondition<String, ?> condition);
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
       abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
 
       abstract ReadAll build();
@@ -361,6 +453,21 @@ public class TextIO {
       return toBuilder().setCompressionType(compressionType).build();
     }
 
+    /** Same as {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. 
*/
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ReadAll watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> 
terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
     @VisibleForTesting
     ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -368,13 +475,21 @@ public class TextIO {
 
     @Override
     public PCollection<String> expand(PCollection<String> input) {
+      Match.Filepatterns matchFilepatterns =
+          
Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        matchFilepatterns =
+            matchFilepatterns.continuously(
+                getWatchForNewFilesInterval(), 
getWatchForNewFilesTerminationCondition());
+      }
       return input
+          .apply(matchFilepatterns)
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
                   new IsSplittableFn(getCompressionType()),
                   getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getCompressionType())))
+                  new CreateTextSourceFn(getCompressionType(), 
getEmptyMatchTreatment())))
           .setCoder(StringUtf8Coder.of());
     }
 
@@ -390,15 +505,18 @@ public class TextIO {
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
       private final CompressionType compressionType;
+      private final EmptyMatchTreatment emptyMatchTreatment;
 
-      private CreateTextSourceFn(CompressionType compressionType) {
+      private CreateTextSourceFn(
+          CompressionType compressionType, EmptyMatchTreatment 
emptyMatchTreatment) {
         this.compressionType = compressionType;
+        this.emptyMatchTreatment = emptyMatchTreatment;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
         return Read.wrapWithCompression(
-            new TextSource(StaticValueProvider.of(input)), compressionType);
+            new TextSource(StaticValueProvider.of(input), 
emptyMatchTreatment), compressionType);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 37c6263..3e023db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -524,12 +524,15 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    * <li>It must return {@code void}.
    * </ul>
    *
-   * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2>
+   * <h2>Splittable DoFn's</h2>
    *
    * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} 
method has a parameter
    * whose type is a subtype of {@link RestrictionTracker}. This is an 
advanced feature and an
-   * overwhelming majority of users will never need to write a splittable 
{@link DoFn}. Right now
-   * the implementation of this feature is in progress and it's not ready for 
any use.
+   * overwhelming majority of users will never need to write a splittable 
{@link DoFn}.
+   *
+   * <p>Not all runners support Splittable DoFn. See the
+   * <a 
href="https://beam.apache.org/documentation/runners/capability-matrix/";>capability
+   * matrix</a>.
    *
    * <p>See <a href="https://s.apache.org/splittable-do-fn";>the proposal</a> 
for an overview of the
    * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, 
<i>restriction tracker</i>).
@@ -558,8 +561,6 @@ public abstract class DoFn<InputT, OutputT> implements 
Serializable, HasDisplayD
    * </ul>
    *
    * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these 
methods.
-   *
-   * <p>More documentation will be added when the feature becomes ready for 
general usage.
    */
   @Documented
   @Retention(RetentionPolicy.RUNTIME)

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index fc6f18d..9da2408 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -38,7 +38,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -64,6 +63,8 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;
@@ -554,14 +555,13 @@ public class Watch {
       if (outputCoder == null) {
         // If a coder was not specified explicitly, infer it from the OutputT 
type parameter
         // of the PollFn.
-        TypeDescriptor<?> superDescriptor =
-            
TypeDescriptor.of(getPollFn().getClass()).getSupertype(PollFn.class);
-        TypeVariable typeVariable = 
superDescriptor.getTypeParameter("OutputT");
-        @SuppressWarnings("unchecked")
-        TypeDescriptor<OutputT> descriptor =
-            (TypeDescriptor<OutputT>) 
superDescriptor.resolveType(typeVariable);
+        TypeDescriptor<OutputT> outputT =
+            TypeDescriptors.extractFromTypeParameters(
+                getPollFn(),
+                PollFn.class,
+                new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() 
{});
         try {
-          outputCoder = 
input.getPipeline().getCoderRegistry().getCoder(descriptor);
+          outputCoder = 
input.getPipeline().getCoderRegistry().getCoder(outputT);
         } catch (CannotProvideCoderException e) {
           throw new RuntimeException(
               "Unable to infer coder for OutputT. Specify it explicitly using 
withOutputCoder().");

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 8ad6030..aa6090d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
+import static 
org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -63,6 +64,7 @@ import java.util.zip.ZipOutputStream;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSplittableParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -78,6 +81,7 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import 
org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -787,7 +791,8 @@ public class TextIOReadTest {
   private TextSource prepareSource(byte[] data) throws IOException {
     Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
     Files.write(path, data);
-    return new 
TextSource(ValueProvider.StaticValueProvider.of(path.toString()));
+    return new TextSource(
+        ValueProvider.StaticValueProvider.of(path.toString()), 
EmptyMatchTreatment.DISALLOW);
   }
 
   @Test
@@ -872,4 +877,51 @@ public class TextIOReadTest {
     PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, 
LARGE));
     p.run();
   }
+
+  @Test
+  @Category({NeedsRunner.class, UsesSplittableParDo.class})
+  public void testReadWatchForNewFiles() throws IOException, 
InterruptedException {
+    final Path basePath = tempFolder.resolve("readWatch");
+    basePath.toFile().mkdir();
+    PCollection<String> lines =
+        p.apply(
+            TextIO.read()
+                .from(basePath.resolve("*").toString())
+                // Make sure that compression type propagates into readAll()
+                .withCompressionType(ZIP)
+                .watchForNewFiles(
+                    Duration.millis(100), 
afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+
+    Thread writer =
+        new Thread() {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(1000);
+              writeToFile(
+                  Arrays.asList("a.1", "a.2"),
+                  basePath.resolve("fileA").toString(),
+                  CompressionType.ZIP);
+              Thread.sleep(300);
+              writeToFile(
+                  Arrays.asList("b.1", "b.2"),
+                  basePath.resolve("fileB").toString(),
+                  CompressionType.ZIP);
+              Thread.sleep(300);
+              writeToFile(
+                  Arrays.asList("c.1", "c.2"),
+                  basePath.resolve("fileC").toString(),
+                  CompressionType.ZIP);
+            } catch (IOException | InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+    writer.start();
+
+    PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", 
"c.2");
+    p.run();
+
+    writer.join();
+  }
 }

Reply via email to