http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 78340f3..f1eb7c0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,38 +22,22 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.CompressedSource.CompressionMode;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -63,14 +47,13 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>To read a {@link PCollection} from one or more text files, use {@code 
TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify 
the path of the
- * file(s) to be read. Alternatively, if the filenames to be read are 
themselves in a
- * {@link PCollection}, apply {@link TextIO#readAll()}.
+ * file(s) to be read.
  *
  * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String 
Strings}, each
  * corresponding to one line of an input UTF-8 text file (split into lines 
delimited by '\n', '\r',
  * or '\r\n').
  *
- * <p>Example 1: reading a file or filepattern.
+ * <p>Example:
  *
  * <pre>{@code
  * Pipeline p = ...;
@@ -79,24 +62,22 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<String> lines = 
p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
- * <p>Example 2: reading a PCollection of filenames.
- *
- * <pre>{@code
- * Pipeline p = ...;
- *
- * // E.g. the filenames might be computed from other data in the pipeline, or
- * // read from a data source.
- * PCollection<String> filenames = ...;
- *
- * // Read all files in the collection.
- * PCollection<String> lines = filenames.apply(TextIO.readAll());
- * }</pre>
- *
  * <p>To write a {@link PCollection} to one or more text files, use {@code 
TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files 
to write.
  *
- * <p>For example:
+ * <p>By default, all input is put into the global window before writing. If 
per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link TextIO.Write#withWindowedWrites()} will cause windowing and 
triggering to be
+ * preserved. When producing windowed writes, the number of output shards must 
be set explicitly
+ * using {@link TextIO.Write#withNumShards(int)}; some runners may set this 
for you to a
+ * runner-chosen value, so you may need not set it yourself. A {@link 
FilenamePolicy} can also be
+ * set in case you need better control over naming files created by unique 
windows.
+ * {@link DefaultFilenamePolicy} policy for producing unique filenames might 
not be appropriate
+ * for your use case.
+ *
+ * <p>Any existing files with the same names as generated output files will be 
overwritten.
  *
+ * <p>For example:
  * <pre>{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<String> lines = ...;
@@ -104,49 +85,10 @@ import org.apache.beam.sdk.values.PDone;
  *
  * // Same as above, only with Gzip compression:
  * PCollection<String> lines = ...;
- * lines.apply(TextIO.write().to("/path/to/file.txt"))
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
  *      .withSuffix(".txt")
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
- *
- * <p>By default, all input is put into the global window before writing. If 
per-window writes are
- * desired - for example, when using a streaming runner - {@link 
TextIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing 
windowed writes with a
- * streaming runner that supports triggers, the number of output shards must 
be set explicitly using
- * {@link TextIO.Write#withNumShards(int)}; some runners may set this for you 
to a runner-chosen
- * value, so you may need not set it yourself. If setting an explicit template 
using {@link
- * TextIO.Write#withShardNameTemplate(String)}, make sure that the template 
contains placeholders
- * for the window and the pane; W is expanded into the window text, and P into 
the pane; the default
- * template will include both the window and the pane in the filename.
- *
- * <p>If you want better control over how filenames are generated than the 
default policy allows, a
- * custom {@link FilenamePolicy} can also be set using {@link 
TextIO.Write#to(FilenamePolicy)}.
- *
- * <p>TextIO also supports dynamic, value-dependent file destinations. The 
most general form of this
- * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link 
DynamicDestinations} class
- * allows you to convert any input value into a custom destination object, and 
map that destination
- * object to a {@link FilenamePolicy}. This allows using different filename 
policies (or more
- * commonly, differently-configured instances of the same policy) based on the 
input record. Often
- * this is used in conjunction with {@link 
TextIO#writeCustomType(SerializableFunction)}, which
- * allows your {@link DynamicDestinations} object to examine the input type 
and takes a format
- * function to convert that type to a string for writing.
- *
- * <p>A convenience shortcut is provided for the case where the default naming 
policy is used, but
- * different configurations of this policy are wanted based on the input 
record. Default naming
- * policies can be configured using the {@link DefaultFilenamePolicy.Params} 
object.
- *
- * <pre>{@code
- * PCollection<UserEvent>> lines = ...;
- * lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent())
- *      .to(new SerializableFunction<UserEvent, Params>() {
- *         public String apply(UserEvent value) {
- *           return new Params().withBaseFilename(baseDirectory + "/" + 
value.country());
- *         }
- *       }),
- *       new Params().withBaseFilename(baseDirectory + "/empty");
- * }</pre>
- *
- * <p>Any existing files with the same names as generated output files will be 
overwritten.
  */
 public class TextIO {
   /**
@@ -158,54 +100,16 @@ public class TextIO {
   }
 
   /**
-   * A {@link PTransform} that works like {@link #read}, but reads each file 
in a {@link
-   * PCollection} of filepatterns.
-   *
-   * <p>Can be applied to both bounded and unbounded {@link PCollection 
PCollections}, so this is
-   * suitable for reading a {@link PCollection} of filepatterns arriving as a 
stream. However, every
-   * filepattern is expanded once at the moment it is processed, rather than 
watched for new files
-   * matching the filepattern to appear. Likewise, every file is read once, 
rather than watched for
-   * new entries.
-   */
-  public static ReadAll readAll() {
-    return new AutoValue_TextIO_ReadAll.Builder()
-        .setCompressionType(CompressionType.AUTO)
-        // 64MB is a reasonable value that allows to amortize the cost of 
opening files,
-        // but is not so large as to exhaust a typical runner's maximum amount 
of output per
-        // ProcessElement call.
-        .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
-        .build();
-  }
-
-  /**
    * A {@link PTransform} that writes a {@link PCollection} to a text file (or 
multiple text files
    * matching a sharding pattern), with each element of the input collection 
encoded into its own
    * line.
    */
   public static Write write() {
-    return new TextIO.Write();
-  }
-
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} to a text file (or 
multiple text files
-   * matching a sharding pattern), with each element of the input collection 
encoded into its own
-   * line.
-   *
-   * <p>This version allows you to apply {@link TextIO} writes to a 
PCollection of a custom type
-   * {@link T}, along with a format function that converts the input type 
{@link T} to the String
-   * that will be written to the file. The advantage of this is it allows a 
user-provided {@link
-   * DynamicDestinations} object, set via {@link 
Write#to(DynamicDestinations)} to examine the
-   * user's custom type when choosing a destination.
-   */
-  public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, 
String> formatFunction) {
-    return new AutoValue_TextIO_TypedWrite.Builder<T>()
+    return new AutoValue_TextIO_Write.Builder()
         .setFilenamePrefix(null)
-        .setTempDirectory(null)
         .setShardTemplate(null)
         .setFilenameSuffix(null)
         .setFilenamePolicy(null)
-        .setDynamicDestinations(null)
-        .setFormatFunction(formatFunction)
         
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
         .setNumShards(0)
@@ -274,34 +178,29 @@ public class TextIO {
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      return wrapWithCompression(new TextSource(getFilepattern()), 
getCompressionType());
-    }
-
-    private static FileBasedSource<String> wrapWithCompression(
-        FileBasedSource<String> source, CompressionType compressionType) {
-      switch (compressionType) {
+      switch (getCompressionType()) {
         case UNCOMPRESSED:
-          return source;
+          return new TextSource(getFilepattern());
         case AUTO:
-          return CompressedSource.from(source);
+          return CompressedSource.from(new TextSource(getFilepattern()));
         case BZIP2:
           return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.BZIP2);
+              CompressedSource.from(new TextSource(getFilepattern()))
+                  .withDecompression(CompressedSource.CompressionMode.BZIP2);
         case GZIP:
           return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.GZIP);
+              CompressedSource.from(new TextSource(getFilepattern()))
+                  .withDecompression(CompressedSource.CompressionMode.GZIP);
         case ZIP:
           return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.ZIP);
+              CompressedSource.from(new TextSource(getFilepattern()))
+                  .withDecompression(CompressedSource.CompressionMode.ZIP);
         case DEFLATE:
           return
-              CompressedSource.from(source)
-                  .withDecompression(CompressionMode.DEFLATE);
+              CompressedSource.from(new TextSource(getFilepattern()))
+                  .withDecompression(CompressedSource.CompressionMode.DEFLATE);
         default:
-          throw new IllegalArgumentException("Unknown compression type: " + 
compressionType);
+          throw new IllegalArgumentException("Unknown compression type: " + 
getFilepattern());
       }
     }
 
@@ -324,170 +223,18 @@ public class TextIO {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Implementation of {@link #readAll}. */
-  @AutoValue
-  public abstract static class ReadAll
-      extends PTransform<PCollection<String>, PCollection<String>> {
-    abstract CompressionType getCompressionType();
-    abstract long getDesiredBundleSizeBytes();
-
-    abstract Builder toBuilder();
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setCompressionType(CompressionType compressionType);
-      abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
-
-      abstract ReadAll build();
-    }
-
-    /** Same as {@link Read#withCompressionType(CompressionType)}. */
-    public ReadAll withCompressionType(CompressionType compressionType) {
-      return toBuilder().setCompressionType(compressionType).build();
-    }
-
-    @VisibleForTesting
-    ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
-      return 
toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
-    }
-
-    @Override
-    public PCollection<String> expand(PCollection<String> input) {
-      return input
-          .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
-          .apply(
-              "Split into ranges",
-              ParDo.of(new SplitIntoRangesFn(getCompressionType(), 
getDesiredBundleSizeBytes())))
-          .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, 
OffsetRange>>())
-          .apply("Read", ParDo.of(new ReadTextFn(this)));
-    }
-
-    private static class ReshuffleWithUniqueKey<T>
-        extends PTransform<PCollection<T>, PCollection<T>> {
-      @Override
-      public PCollection<T> expand(PCollection<T> input) {
-        return input
-            .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
-            .apply("Reshuffle", Reshuffle.<Integer, T>of())
-            .apply("Values", Values.<T>create());
-      }
-    }
-
-    private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>> {
-      private int index;
-
-      @Setup
-      public void setup() {
-        this.index = ThreadLocalRandom.current().nextInt();
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) {
-        c.output(KV.of(++index, c.element()));
-      }
-    }
-
-    private static class ExpandGlobFn extends DoFn<String, Metadata> {
-      @ProcessElement
-      public void process(ProcessContext c) throws Exception {
-        MatchResult match = FileSystems.match(c.element());
-        checkArgument(
-            match.status().equals(Status.OK),
-            "Failed to match filepattern %s: %s",
-            c.element(),
-            match.status());
-        for (Metadata metadata : match.metadata()) {
-          c.output(metadata);
-        }
-      }
-    }
-
-    private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, 
OffsetRange>> {
-      private final CompressionType compressionType;
-      private final long desiredBundleSize;
-
-      private SplitIntoRangesFn(CompressionType compressionType, long 
desiredBundleSize) {
-        this.compressionType = compressionType;
-        this.desiredBundleSize = desiredBundleSize;
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) {
-        Metadata metadata = c.element();
-        final boolean isSplittable = isSplittable(metadata, compressionType);
-        if (!isSplittable) {
-          c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
-          return;
-        }
-        for (OffsetRange range :
-            new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 
0)) {
-          c.output(KV.of(metadata, range));
-        }
-      }
-
-      static boolean isSplittable(Metadata metadata, CompressionType 
compressionType) {
-        if (!metadata.isReadSeekEfficient()) {
-          return false;
-        }
-        switch (compressionType) {
-          case AUTO:
-            return 
!CompressionMode.isCompressed(metadata.resourceId().toString());
-          case UNCOMPRESSED:
-            return true;
-          case GZIP:
-          case BZIP2:
-          case ZIP:
-          case DEFLATE:
-            return false;
-          default:
-            throw new UnsupportedOperationException("Unknown compression type: 
" + compressionType);
-        }
-      }
-    }
-
-    private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, 
String> {
-      private final TextIO.ReadAll spec;
-
-      private ReadTextFn(ReadAll spec) {
-        this.spec = spec;
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) throws IOException {
-        Metadata metadata = c.element().getKey();
-        OffsetRange range = c.element().getValue();
-        FileBasedSource<String> source =
-            TextIO.Read.wrapWithCompression(
-                new TextSource(StaticValueProvider.of(metadata.toString())),
-                spec.getCompressionType());
-        BoundedSource.BoundedReader<String> reader =
-            source
-                .createForSubrangeOfFile(metadata, range.getFrom(), 
range.getTo())
-                .createReader(c.getPipelineOptions());
-        for (boolean more = reader.start(); more; more = reader.advance()) {
-          c.output(reader.getCurrent());
-        }
-      }
-    }
-  }
 
   /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class TypedWrite<T> extends 
PTransform<PCollection<T>, PDone> {
+  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
     /** The prefix of each file written, combined with suffix and 
shardTemplate. */
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
     /** The suffix of each file written, combined with prefix and 
shardTemplate. */
     @Nullable abstract String getFilenameSuffix();
 
-    /** The base directory used for generating temporary files. */
-    @Nullable
-    abstract ValueProvider<ResourceId> getTempDirectory();
-
     /** An optional header to add to each file. */
     @Nullable abstract String getHeader();
 
@@ -503,13 +250,6 @@ public class TextIO {
     /** A policy for naming output files. */
     @Nullable abstract FilenamePolicy getFilenamePolicy();
 
-    /** Allows for value-dependent {@link DynamicDestinations} to be vended. */
-    @Nullable
-    abstract DynamicDestinations<T, ?> getDynamicDestinations();
-
-    /** A function that converts T to a String, for writing to the file. */
-    abstract SerializableFunction<T, String> getFormatFunction();
-
     /** Whether to write windowed output files. */
     abstract boolean getWindowedWrites();
 
@@ -519,68 +259,66 @@ public class TextIO {
      */
     abstract WritableByteChannelFactory getWritableByteChannelFactory();
 
-    abstract Builder<T> toBuilder();
+    abstract Builder toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> 
filenamePrefix);
-
-      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> 
tempDirectory);
-
-      abstract Builder<T> setShardTemplate(@Nullable String shardTemplate);
-
-      abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix);
-
-      abstract Builder<T> setHeader(@Nullable String header);
-
-      abstract Builder<T> setFooter(@Nullable String footer);
-
-      abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy 
filenamePolicy);
-
-      abstract Builder<T> setDynamicDestinations(
-          @Nullable DynamicDestinations<T, ?> dynamicDestinations);
-
-      abstract Builder<T> setFormatFunction(SerializableFunction<T, String> 
formatFunction);
-
-      abstract Builder<T> setNumShards(int numShards);
-
-      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
-
-      abstract Builder<T> setWritableByteChannelFactory(
+    abstract static class Builder {
+      abstract Builder setFilenamePrefix(ValueProvider<ResourceId> 
filenamePrefix);
+      abstract Builder setShardTemplate(@Nullable String shardTemplate);
+      abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
+      abstract Builder setHeader(@Nullable String header);
+      abstract Builder setFooter(@Nullable String footer);
+      abstract Builder setFilenamePolicy(@Nullable FilenamePolicy 
filenamePolicy);
+      abstract Builder setNumShards(int numShards);
+      abstract Builder setWindowedWrites(boolean windowedWrites);
+      abstract Builder setWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory);
 
-      abstract TypedWrite<T> build();
+      abstract Write build();
     }
 
     /**
-     * Writes to text files with the given prefix. The given {@code prefix} 
can reference any {@link
-     * FileSystem} on the classpath. This prefix is used by the {@link 
DefaultFilenamePolicy} to
-     * generate filenames.
+     * Writes to text files with the given prefix. The given {@code prefix} 
can reference any
+     * {@link FileSystem} on the classpath.
      *
-     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using 
the specified prefix
-     * to define the base output directory and file prefix, a shard identifier 
(see {@link
-     * #withNumShards(int)}), and a common suffix (if supplied using {@link 
#withSuffix(String)}).
+     * <p>The name of the output files will be determined by the {@link 
FilenamePolicy} used.
      *
-     * <p>This default policy can be overridden using {@link 
#to(FilenamePolicy)}, in which case
-     * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} 
should not be set.
-     * Custom filename policies do not automatically see this prefix - you 
should explicitly pass
-     * the prefix into your {@link FilenamePolicy} object if you need this.
+     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using 
the specified prefix
+     * to define the base output directory and file prefix, a shard identifier 
(see
+     * {@link #withNumShards(int)}), and a common suffix (if supplied using
+     * {@link #withSuffix(String)}).
      *
-     * <p>If {@link #withTempDirectory} has not been called, this filename 
prefix will be used to
-     * infer a directory for temporary files.
+     * <p>This default policy can be overridden using {@link 
#withFilenamePolicy(FilenamePolicy)},
+     * in which case {@link #withShardNameTemplate(String)} and {@link 
#withSuffix(String)} should
+     * not be set.
      */
-    public TypedWrite<T> to(String filenamePrefix) {
+    public Write to(String filenamePrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
     }
 
-    /** Like {@link #to(String)}. */
+    /**
+     * Writes to text files with prefix from the given resource.
+     *
+     * <p>The name of the output files will be determined by the {@link 
FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using 
the specified prefix
+     * to define the base output directory and file prefix, a shard identifier 
(see
+     * {@link #withNumShards(int)}), and a common suffix (if supplied using
+     * {@link #withSuffix(String)}).
+     *
+     * <p>This default policy can be overridden using {@link 
#withFilenamePolicy(FilenamePolicy)},
+     * in which case {@link #withShardNameTemplate(String)} and {@link 
#withSuffix(String)} should
+     * not be set.
+     */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> to(ResourceId filenamePrefix) {
+    public Write to(ResourceId filenamePrefix) {
       return toResource(StaticValueProvider.of(filenamePrefix));
     }
 
-    /** Like {@link #to(String)}. */
-    public TypedWrite<T> to(ValueProvider<String> outputPrefix) {
+    /**
+     * Like {@link #to(String)}.
+     */
+    public Write to(ValueProvider<String> outputPrefix) {
       return toResource(NestedValueProvider.of(outputPrefix,
           new SerializableFunction<String, ResourceId>() {
             @Override
@@ -591,77 +329,43 @@ public class TextIO {
     }
 
     /**
-     * Writes to files named according to the given {@link 
FileBasedSink.FilenamePolicy}. A
-     * directory for temporary files must be specified using {@link 
#withTempDirectory}.
+     * Like {@link #to(ResourceId)}.
      */
-    public TypedWrite<T> to(FilenamePolicy filenamePolicy) {
-      return toBuilder().setFilenamePolicy(filenamePolicy).build();
-    }
-
-    /**
-     * Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} 
objects. These
-     * objects can examine the input record when creating a {@link 
FilenamePolicy}. A directory for
-     * temporary files must be specified using {@link #withTempDirectory}.
-     */
-    public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
-      return toBuilder().setDynamicDestinations(dynamicDestinations).build();
-    }
-
-    /**
-     * Write to dynamic destinations using the default filename policy. The 
destinationFunction maps
-     * the input record to a {@link DefaultFilenamePolicy.Params} object that 
specifies where the
-     * records should be written (base filename, file suffix, and shard 
template). The
-     * emptyDestination parameter specified where empty files should be 
written for when the written
-     * {@link PCollection} is empty.
-     */
-    public TypedWrite<T> to(
-        SerializableFunction<T, Params> destinationFunction, Params 
emptyDestination) {
-      return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, 
emptyDestination));
-    }
-
-    /** Like {@link #to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) {
+    public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
       return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
-    /** Set the base directory used to generate temporary files. */
-    @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> 
tempDirectory) {
-      return toBuilder().setTempDirectory(tempDirectory).build();
-    }
-
-    /** Set the base directory used to generate temporary files. */
-    @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) {
-      return withTempDirectory(StaticValueProvider.of(tempDirectory));
-    }
-
     /**
      * Uses the given {@link ShardNameTemplate} for naming output files. This 
option may only be
-     * used when using one of the default filename-prefix to() overrides - 
i.e. not when using
-     * either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
+     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been 
configured.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name 
template, and suffix are
      * used.
      */
-    public TypedWrite<T> withShardNameTemplate(String shardTemplate) {
+    public Write withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
-     * Configures the filename suffix for written files. This option may only 
be used when using one
-     * of the default filename-prefix to() overrides - i.e. not when using 
either {@link
-     * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
+     * Configures the filename suffix for written files. This option may only 
be used when
+     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name 
template, and suffix are
      * used.
      */
-    public TypedWrite<T> withSuffix(String filenameSuffix) {
+    public Write withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
+     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used 
to name written files.
+     */
+    public Write withFilenamePolicy(FilenamePolicy filenamePolicy) {
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
+    }
+
+    /**
      * Configures the number of output shards produced overall (when using 
unwindowed writes) or
      * per-window (when using windowed writes).
      *
@@ -671,13 +375,14 @@ public class TextIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system 
decide.
      */
-    public TypedWrite<T> withNumShards(int numShards) {
+    public Write withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
 
     /**
-     * Forces a single file as output and empty shard name template.
+     * Forces a single file as output and empty shard name template. This 
option is only compatible
+     * with unwindowed writes.
      *
      * <p>For unwindowed writes, constraining the number of shards is likely 
to reduce the
      * performance of a pipeline. Setting this value is not recommended unless 
you require a
@@ -685,7 +390,7 @@ public class TextIO {
      *
      * <p>This is equivalent to {@code 
.withNumShards(1).withShardNameTemplate("")}
      */
-    public TypedWrite<T> withoutSharding() {
+    public Write withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
@@ -694,7 +399,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured header.
      */
-    public TypedWrite<T> withHeader(@Nullable String header) {
+    public Write withHeader(@Nullable String header) {
       return toBuilder().setHeader(header).build();
     }
 
@@ -703,82 +408,48 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured footer.
      */
-    public TypedWrite<T> withFooter(@Nullable String footer) {
+    public Write withFooter(@Nullable String footer) {
       return toBuilder().setFooter(footer).build();
     }
 
     /**
-     * Returns a transform for writing to text files like this one but that 
has the given {@link
-     * WritableByteChannelFactory} to be used by the {@link FileBasedSink} 
during output. The
-     * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     * Returns a transform for writing to text files like this one but that 
has the given
+     * {@link WritableByteChannelFactory} to be used by the {@link 
FileBasedSink} during output.
+     * The default is value is {@link 
FileBasedSink.CompressionType#UNCOMPRESSED}.
      *
      * <p>A {@code null} value will reset the value to the default value 
mentioned above.
      */
-    public TypedWrite<T> withWritableByteChannelFactory(
+    public Write withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
       return 
toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
     }
 
-    /**
-     * Preserves windowing of input elements and writes them to files based on 
the element's window.
-     *
-     * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will 
be generated using
-     * {@link FilenamePolicy#windowedFilename}. See also {@link 
WriteFiles#withWindowedWrites()}.
-     */
-    public TypedWrite<T> withWindowedWrites() {
+    public Write withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }
 
-    private DynamicDestinations<T, ?> resolveDynamicDestinations() {
-      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
-      if (dynamicDestinations == null) {
-        FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-        if (usedFilenamePolicy == null) {
-          usedFilenamePolicy =
-              DefaultFilenamePolicy.fromStandardParameters(
-                  getFilenamePrefix(),
-                  getShardTemplate(),
-                  getFilenameSuffix(),
-                  getWindowedWrites());
-        }
-        dynamicDestinations = 
DynamicFileDestinations.constant(usedFilenamePolicy);
-      }
-      return dynamicDestinations;
-    }
-
     @Override
-    public PDone expand(PCollection<T> input) {
-      checkState(
-          getFilenamePrefix() != null || getTempDirectory() != null,
-          "Need to set either the filename prefix or the tempDirectory of a 
TextIO.Write "
-              + "transform.");
+    public PDone expand(PCollection<String> input) {
+      checkState(getFilenamePrefix() != null,
+          "Need to set the filename prefix of a TextIO.Write transform.");
       checkState(
-          getFilenamePolicy() == null || getDynamicDestinations() == null,
-          "Cannot specify both a filename policy and dynamic destinations");
-      if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
-        checkState(
-            getShardTemplate() == null && getFilenameSuffix() == null,
-            "shardTemplate and filenameSuffix should only be used with the 
default "
-                + "filename policy");
+          (getFilenamePolicy() == null)
+              || (getShardTemplate() == null && getFilenameSuffix() == null),
+          "Cannot set a filename policy and also a filename template or 
suffix.");
+
+      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+      if (usedFilenamePolicy == null) {
+        usedFilenamePolicy = 
DefaultFilenamePolicy.constructUsingStandardParameters(
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), 
getWindowedWrites());
       }
-      return expandTyped(input, resolveDynamicDestinations());
-    }
-
-    public <DestinationT> PDone expandTyped(
-        PCollection<T> input, DynamicDestinations<T, DestinationT> 
dynamicDestinations) {
-      ValueProvider<ResourceId> tempDirectory = getTempDirectory();
-      if (tempDirectory == null) {
-        tempDirectory = getFilenamePrefix();
-      }
-      WriteFiles<T, DestinationT, String> write =
+      WriteFiles<String> write =
           WriteFiles.to(
-              new TextSink<>(
-                  tempDirectory,
-                  dynamicDestinations,
+              new TextSink(
+                  getFilenamePrefix(),
+                  usedFilenamePolicy,
                   getHeader(),
                   getFooter(),
-                  getWritableByteChannelFactory()),
-              getFormatFunction());
+                  getWritableByteChannelFactory()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -792,26 +463,27 @@ public class TextIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      resolveDynamicDestinations().populateDisplayData(builder);
-      String tempDirectory = null;
-      if (getTempDirectory() != null) {
-        tempDirectory =
-            getTempDirectory().isAccessible()
-                ? getTempDirectory().get().toString()
-                : getTempDirectory().toString();
+      String prefixString = "";
+      if (getFilenamePrefix() != null) {
+        prefixString = getFilenamePrefix().isAccessible()
+            ? getFilenamePrefix().get().toString() : 
getFilenamePrefix().toString();
       }
       builder
-          .addIfNotDefault(
-              DisplayData.item("numShards", getNumShards()).withLabel("Maximum 
Output Shards"), 0)
-          .addIfNotNull(
-              DisplayData.item("tempDirectory", tempDirectory)
-                  .withLabel("Directory for temporary files"))
-          .addIfNotNull(DisplayData.item("fileHeader", 
getHeader()).withLabel("File Header"))
-          .addIfNotNull(DisplayData.item("fileFooter", 
getFooter()).withLabel("File Footer"))
-          .add(
-              DisplayData.item(
-                      "writableByteChannelFactory", 
getWritableByteChannelFactory().toString())
-                  .withLabel("Compression/Transformation Type"));
+          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+            .withLabel("Output File Prefix"))
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+            .withLabel("Output File Suffix"))
+          .addIfNotNull(DisplayData.item("shardNameTemplate", 
getShardTemplate())
+            .withLabel("Output Shard Name Template"))
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
+            .withLabel("Maximum Output Shards"), 0)
+          .addIfNotNull(DisplayData.item("fileHeader", getHeader())
+            .withLabel("File Header"))
+          .addIfNotNull(DisplayData.item("fileFooter", getFooter())
+              .withLabel("File Footer"))
+          .add(DisplayData
+              .item("writableByteChannelFactory", 
getWritableByteChannelFactory().toString())
+              .withLabel("Compression/Transformation Type"));
     }
 
     @Override
@@ -821,128 +493,6 @@ public class TextIO {
   }
 
   /**
-   * This class is used as the default return value of {@link TextIO#write()}.
-   *
-   * <p>All methods in this class delegate to the appropriate method of {@link 
TextIO.TypedWrite}.
-   * This class exists for backwards compatibility, and will be removed in 
Beam 3.0.
-   */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
-    @VisibleForTesting TypedWrite<String> inner;
-
-    Write() {
-      this(TextIO.writeCustomType(SerializableFunctions.<String>identity()));
-    }
-
-    Write(TypedWrite<String> inner) {
-      this.inner = inner;
-    }
-
-    /** See {@link TypedWrite#to(String)}. */
-    public Write to(String filenamePrefix) {
-      return new Write(inner.to(filenamePrefix));
-    }
-
-    /** See {@link TypedWrite#to(ResourceId)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write to(ResourceId filenamePrefix) {
-      return new Write(inner.to(filenamePrefix));
-    }
-
-    /** See {@link TypedWrite#to(ValueProvider)}. */
-    public Write to(ValueProvider<String> outputPrefix) {
-      return new Write(inner.to(outputPrefix));
-    }
-
-    /** See {@link TypedWrite#toResource(ValueProvider)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
-      return new Write(inner.toResource(filenamePrefix));
-    }
-
-    /** See {@link TypedWrite#to(FilenamePolicy)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write to(FilenamePolicy filenamePolicy) {
-      return new Write(inner.to(filenamePolicy));
-    }
-
-    /** See {@link TypedWrite#to(DynamicDestinations)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write to(DynamicDestinations<String, ?> dynamicDestinations) {
-      return new Write(inner.to(dynamicDestinations));
-    }
-
-    /** See {@link TypedWrite#to(SerializableFunction, Params)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write to(
-        SerializableFunction<String, Params> destinationFunction, Params 
emptyDestination) {
-      return new Write(inner.to(destinationFunction, emptyDestination));
-    }
-
-    /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
-      return new Write(inner.withTempDirectory(tempDirectory));
-    }
-
-    /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
-    @Experimental(Kind.FILESYSTEM)
-    public Write withTempDirectory(ResourceId tempDirectory) {
-      return new Write(inner.withTempDirectory(tempDirectory));
-    }
-
-    /** See {@link TypedWrite#withShardNameTemplate(String)}. */
-    public Write withShardNameTemplate(String shardTemplate) {
-      return new Write(inner.withShardNameTemplate(shardTemplate));
-    }
-
-    /** See {@link TypedWrite#withSuffix(String)}. */
-    public Write withSuffix(String filenameSuffix) {
-      return new Write(inner.withSuffix(filenameSuffix));
-    }
-
-    /** See {@link TypedWrite#withNumShards(int)}. */
-    public Write withNumShards(int numShards) {
-      return new Write(inner.withNumShards(numShards));
-    }
-
-    /** See {@link TypedWrite#withoutSharding()}. */
-    public Write withoutSharding() {
-      return new Write(inner.withoutSharding());
-    }
-
-    /** See {@link TypedWrite#withHeader(String)}. */
-    public Write withHeader(@Nullable String header) {
-      return new Write(inner.withHeader(header));
-    }
-
-    /** See {@link TypedWrite#withFooter(String)}. */
-    public Write withFooter(@Nullable String footer) {
-      return new Write(inner.withFooter(footer));
-    }
-
-    /** See {@link 
TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */
-    public Write withWritableByteChannelFactory(
-        WritableByteChannelFactory writableByteChannelFactory) {
-      return new 
Write(inner.withWritableByteChannelFactory(writableByteChannelFactory));
-    }
-
-    /** See {@link TypedWrite#withWindowedWrites}. */
-    public Write withWindowedWrites() {
-      return new Write(inner.withWindowedWrites());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      inner.populateDisplayData(builder);
-    }
-
-    @Override
-    public PDone expand(PCollection<String> input) {
-      return inner.expand(input);
-    }
-  }
-
-  /**
    * Possible text file compression types.
    */
   public enum CompressionType {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index b57b28c..511d697 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,29 +34,27 @@ import org.apache.beam.sdk.util.MimeTypes;
  * '\n'} represented in {@code UTF-8} format as the record separator. Each 
record (including the
  * last) is terminated.
  */
-class TextSink<UserT, DestinationT> extends FileBasedSink<String, 
DestinationT> {
+class TextSink extends FileBasedSink<String> {
   @Nullable private final String header;
   @Nullable private final String footer;
 
   TextSink(
       ValueProvider<ResourceId> baseOutputFilename,
-      DynamicDestinations<UserT, DestinationT> dynamicDestinations,
+      FilenamePolicy filenamePolicy,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
-    super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory);
+    super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
     this.header = header;
     this.footer = footer;
   }
-
   @Override
-  public WriteOperation<String, DestinationT> createWriteOperation() {
-    return new TextWriteOperation<>(this, header, footer);
+  public WriteOperation<String> createWriteOperation() {
+    return new TextWriteOperation(this, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
-  private static class TextWriteOperation<DestinationT>
-      extends WriteOperation<String, DestinationT> {
+  private static class TextWriteOperation extends WriteOperation<String> {
     @Nullable private final String header;
     @Nullable private final String footer;
 
@@ -67,20 +65,20 @@ class TextSink<UserT, DestinationT> extends 
FileBasedSink<String, DestinationT>
     }
 
     @Override
-    public Writer<String, DestinationT> createWriter() throws Exception {
-      return new TextWriter<>(this, header, footer);
+    public Writer<String> createWriter() throws Exception {
+      return new TextWriter(this, header, footer);
     }
   }
 
   /** A {@link Writer Writer} for text files. */
-  private static class TextWriter<DestinationT> extends Writer<String, 
DestinationT> {
+  private static class TextWriter extends Writer<String> {
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
     private OutputStreamWriter out;
 
     public TextWriter(
-        WriteOperation<String, DestinationT> writeOperation,
+        WriteOperation<String> writeOperation,
         @Nullable String header,
         @Nullable String footer) {
       super(writeOperation, MimeTypes.TEXT);

Reply via email to