http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 3bf5d5b..4e2b61c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -27,8 +27,10 @@ import static 
org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +68,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
 import 
org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
@@ -95,9 +98,9 @@ import org.slf4j.LoggerFactory;
  * <p>The process of writing to file-based sink is as follows:
  *
  * <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ *   <li>An optional subclass-defined initialization,
+ *   <li>a parallel write of bundles to temporary files, and finally,
+ *   <li>these temporary files are renamed with final output filenames.
  * </ol>
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple 
times (e.g., in the
@@ -121,7 +124,8 @@ import org.slf4j.LoggerFactory;
  * @param <OutputT> the type of values written to the sink.
  */
 @Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<OutputT, DestinationT> implements 
Serializable, HasDisplayData {
+public abstract class FileBasedSink<UserT, DestinationT, OutputT>
+    implements Serializable, HasDisplayData {
   private static final Logger LOG = 
LoggerFactory.getLogger(FileBasedSink.class);
 
   /** Directly supported file output compression types. */
@@ -199,7 +203,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
     }
   }
 
-  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final DynamicDestinations<?, DestinationT, OutputT> 
dynamicDestinations;
 
   /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data 
output to the
@@ -215,8 +219,54 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
    * destination type into an instance of {@link FilenamePolicy}.
    */
   @Experimental(Kind.FILESYSTEM)
-  public abstract static class DynamicDestinations<UserT, DestinationT>
+  public abstract static class DynamicDestinations<UserT, DestinationT, 
OutputT>
       implements HasDisplayData, Serializable {
+    interface SideInputAccessor {
+      <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view);
+    }
+
+    private SideInputAccessor sideInputAccessor;
+
+    static class SideInputAccessorViaProcessContext implements 
SideInputAccessor {
+      private DoFn<?, ?>.ProcessContext processContext;
+
+      SideInputAccessorViaProcessContext(DoFn<?, ?>.ProcessContext 
processContext) {
+        this.processContext = processContext;
+      }
+
+      @Override
+      public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> 
view) {
+        return processContext.sideInput(view);
+      }
+    }
+
+    /**
+     * Override to specify that this object needs access to one or more side 
inputs. This side
+     * inputs must be globally windowed, as they will be accessed from the 
global window.
+     */
+    public List<PCollectionView<?>> getSideInputs() {
+      return ImmutableList.of();
+    }
+
+    /**
+     * Returns the value of a given side input. The view must be present in 
{@link
+     * #getSideInputs()}.
+     */
+    protected final <SideInputT> SideInputT 
sideInput(PCollectionView<SideInputT> view) {
+      return sideInputAccessor.sideInput(view);
+    }
+
+    final void setSideInputAccessor(SideInputAccessor sideInputAccessor) {
+      this.sideInputAccessor = sideInputAccessor;
+    }
+
+    final void setSideInputAccessorFromProcessContext(DoFn<?, 
?>.ProcessContext context) {
+      this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
+    }
+
+    /** Convert an input record type into the output type. */
+    public abstract OutputT formatRecord(UserT record);
+
     /**
      * Returns an object that represents at a high level the destination being 
written to. May not
      * return null. A destination must have deterministic hash and equality 
methods defined.
@@ -256,12 +306,13 @@ public abstract class FileBasedSink<OutputT, 
DestinationT> implements Serializab
         return destinationCoder;
       }
       // If dynamicDestinations doesn't provide a coder, try to find it in the 
coder registry.
-      @Nullable TypeDescriptor<DestinationT> descriptor =
+      @Nullable
+      TypeDescriptor<DestinationT> descriptor =
           extractFromTypeParameters(
               this,
               DynamicDestinations.class,
               new TypeVariableExtractor<
-                  DynamicDestinations<UserT, DestinationT>, DestinationT>() 
{});
+                  DynamicDestinations<UserT, DestinationT, OutputT>, 
DestinationT>() {});
       checkArgument(
           descriptor != null,
           "Unable to infer a coder for DestinationT, "
@@ -323,7 +374,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
   @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
       ValueProvider<ResourceId> tempDirectoryProvider,
-      DynamicDestinations<?, DestinationT> dynamicDestinations) {
+      DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) {
     this(tempDirectoryProvider, dynamicDestinations, 
CompressionType.UNCOMPRESSED);
   }
 
@@ -331,7 +382,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
   @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
       ValueProvider<ResourceId> tempDirectoryProvider,
-      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
       WritableByteChannelFactory writableByteChannelFactory) {
     this.tempDirectoryProvider =
         NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
@@ -341,8 +392,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
 
   /** Return the {@link DynamicDestinations} used. */
   @SuppressWarnings("unchecked")
-  public <UserT> DynamicDestinations<UserT, DestinationT> 
getDynamicDestinations() {
-    return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
+  public DynamicDestinations<UserT, DestinationT, OutputT> 
getDynamicDestinations() {
+    return (DynamicDestinations<UserT, DestinationT, OutputT>) 
dynamicDestinations;
   }
 
   /**
@@ -357,7 +408,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
   public void validate(PipelineOptions options) {}
 
   /** Return a subclass of {@link WriteOperation} that will manage the write 
to the sink. */
-  public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
+  public abstract WriteOperation<DestinationT, OutputT> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
     getDynamicDestinations().populateDisplayData(builder);
@@ -371,11 +422,11 @@ public abstract class FileBasedSink<OutputT, 
DestinationT> implements Serializab
    * written,
    *
    * <ol>
-   * <li>{@link WriteOperation#finalize} is given a list of the temporary 
files containing the
-   *     output bundles.
-   * <li>During finalize, these temporary files are copied to final output 
locations and named
-   *     according to a file naming template.
-   * <li>Finally, any temporary files that were created during the write are 
removed.
+   *   <li>{@link WriteOperation#finalize} is given a list of the temporary 
files containing the
+   *       output bundles.
+   *   <li>During finalize, these temporary files are copied to final output 
locations and named
+   *       according to a file naming template.
+   *   <li>Finally, any temporary files that were created during the write are 
removed.
    * </ol>
    *
    * <p>Subclass implementations of WriteOperation must implement {@link
@@ -400,9 +451,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
    *
    * @param <OutputT> the type of values written to the sink.
    */
-  public abstract static class WriteOperation<OutputT, DestinationT> 
implements Serializable {
+  public abstract static class WriteOperation<DestinationT, OutputT> 
implements Serializable {
     /** The Sink that this WriteOperation will write to. */
-    protected final FileBasedSink<OutputT, DestinationT> sink;
+    protected final FileBasedSink<?, DestinationT, OutputT> sink;
 
     /** Directory for temporary output files. */
     protected final ValueProvider<ResourceId> tempDirectory;
@@ -428,7 +479,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
      *
      * @param sink the FileBasedSink that will be used to configure this write 
operation.
      */
-    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
+    public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink) {
       this(
           sink,
           NestedValueProvider.of(sink.getTempDirectoryProvider(), new 
TemporaryDirectoryBuilder()));
@@ -463,12 +514,12 @@ public abstract class FileBasedSink<OutputT, 
DestinationT> implements Serializab
      * @param tempDirectory the base directory to be used for temporary output 
files.
      */
     @Experimental(Kind.FILESYSTEM)
-    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, 
ResourceId tempDirectory) {
+    public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink, 
ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
     private WriteOperation(
-        FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> 
tempDirectory) {
+        FileBasedSink<?, DestinationT, OutputT> sink, 
ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
       this.windowedWrites = false;
@@ -478,7 +529,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
      * Clients must implement to return a subclass of {@link Writer}. This 
method must not mutate
      * the state of the object.
      */
-    public abstract Writer<OutputT, DestinationT> createWriter() throws 
Exception;
+    public abstract Writer<DestinationT, OutputT> createWriter() throws 
Exception;
 
     /** Indicates that the operation will be performing windowed writes. */
     public void setWindowedWrites(boolean windowedWrites) {
@@ -533,7 +584,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
     protected final Map<ResourceId, ResourceId> buildOutputFilenames(
         Iterable<FileResult<DestinationT>> writerResults) {
       int numShards = Iterables.size(writerResults);
-      Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
+      Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap();
 
       // Either all results have a shard number set (if the sink is configured 
with a fixed
       // number of shards), or they all don't (otherwise).
@@ -597,7 +648,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
           "Only generated %s distinct file names for %s files.",
           numDistinctShards,
           outputFilenames.size());
-
       return outputFilenames;
     }
 
@@ -691,7 +741,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
     }
 
     /** Returns the FileBasedSink for this write operation. */
-    public FileBasedSink<OutputT, DestinationT> getSink() {
+    public FileBasedSink<?, DestinationT, OutputT> getSink() {
       return sink;
     }
 
@@ -727,10 +777,10 @@ public abstract class FileBasedSink<OutputT, 
DestinationT> implements Serializab
    *
    * @param <OutputT> the type of values to write.
    */
-  public abstract static class Writer<OutputT, DestinationT> {
+  public abstract static class Writer<DestinationT, OutputT> {
     private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
-    private final WriteOperation<OutputT, DestinationT> writeOperation;
+    private final WriteOperation<DestinationT, OutputT> writeOperation;
 
     /** Unique id for this output bundle. */
     private String id;
@@ -757,7 +807,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
     private final String mimeType;
 
     /** Construct a new {@link Writer} that will produce files of the given 
MIME type. */
-    public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String 
mimeType) {
+    public Writer(WriteOperation<DestinationT, OutputT> writeOperation, String 
mimeType) {
       checkNotNull(writeOperation);
       this.writeOperation = writeOperation;
       this.mimeType = mimeType;
@@ -930,9 +980,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
     }
 
     /** Return the WriteOperation that this Writer belongs to. */
-    public WriteOperation<OutputT, DestinationT> getWriteOperation() {
+    public WriteOperation<DestinationT, OutputT> getWriteOperation() {
       return writeOperation;
     }
+
+    /** Return the user destination object for this writer. */
+    public DestinationT getDestination() {
+      return destination;
+    }
   }
 
   /**
@@ -987,7 +1042,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> 
implements Serializab
 
     @Experimental(Kind.FILESYSTEM)
     public ResourceId getDestinationFile(
-        DynamicDestinations<?, DestinationT> dynamicDestinations,
+        DynamicDestinations<?, DestinationT, ?> dynamicDestinations,
         int numShards,
         OutputFileHints outputFileHints) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 6e7b243..29b3e29 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -357,10 +356,12 @@ public class TFRecordIO {
       checkState(getOutputPrefix() != null,
           "need to set the output prefix of a TFRecordIO.Write transform");
       WriteFiles<byte[], Void, byte[]> write =
-          WriteFiles.<byte[], Void, byte[]>to(
+          WriteFiles.to(
               new TFRecordSink(
-                  getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), 
getCompressionType()),
-              SerializableFunctions.<byte[]>identity());
+                  getOutputPrefix(),
+                  getShardTemplate(),
+                  getFilenameSuffix(),
+                  getCompressionType()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -548,7 +549,7 @@ public class TFRecordIO {
 
   /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
   @VisibleForTesting
-  static class TFRecordSink extends FileBasedSink<byte[], Void> {
+  static class TFRecordSink extends FileBasedSink<byte[], Void, byte[]> {
     @VisibleForTesting
     TFRecordSink(
         ValueProvider<ResourceId> outputPrefix,
@@ -557,7 +558,7 @@ public class TFRecordIO {
         TFRecordIO.CompressionType compressionType) {
       super(
           outputPrefix,
-          DynamicFileDestinations.constant(
+          DynamicFileDestinations.<byte[]>constant(
               DefaultFilenamePolicy.fromStandardParameters(
                   outputPrefix, shardTemplate, suffix, false)),
           writableByteChannelFactory(compressionType));
@@ -571,7 +572,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public WriteOperation<byte[], Void> createWriteOperation() {
+    public WriteOperation<Void, byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 
@@ -591,23 +592,23 @@ public class TFRecordIO {
     }
 
     /** A {@link WriteOperation WriteOperation} for TFRecord files. */
-    private static class TFRecordWriteOperation extends WriteOperation<byte[], 
Void> {
+    private static class TFRecordWriteOperation extends WriteOperation<Void, 
byte[]> {
       private TFRecordWriteOperation(TFRecordSink sink) {
         super(sink);
       }
 
       @Override
-      public Writer<byte[], Void> createWriter() throws Exception {
+      public Writer<Void, byte[]> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }
 
     /** A {@link Writer Writer} for TFRecord files. */
-    private static class TFRecordWriter extends Writer<byte[], Void> {
+    private static class TFRecordWriter extends Writer<Void, byte[]> {
       private WritableByteChannel outChannel;
       private TFRecordCodec codec;
 
-      private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
+      private TFRecordWriter(WriteOperation<Void, byte[]> writeOperation) {
         super(writeOperation, MimeTypes.BINARY);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 765a842..312dc07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,10 @@ import static 
com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -52,8 +56,8 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>To read a {@link PCollection} from one or more text files, use {@code 
TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify 
the path of the
- * file(s) to be read. Alternatively, if the filenames to be read are 
themselves in a
- * {@link PCollection}, apply {@link TextIO#readAll()}.
+ * file(s) to be read. Alternatively, if the filenames to be read are 
themselves in a {@link
+ * PCollection}, apply {@link TextIO#readAll()}.
  *
  * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String 
Strings}, each
  * corresponding to one line of an input UTF-8 text file (split into lines 
delimited by '\n', '\r',
@@ -70,8 +74,8 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>If it is known that the filepattern will match a very large number of 
files (e.g. tens of
  * thousands or more), use {@link Read#withHintMatchesManyFiles} for better 
performance and
- * scalability. Note that it may decrease performance if the filepattern 
matches only a small
- * number of files.
+ * scalability. Note that it may decrease performance if the filepattern 
matches only a small number
+ * of files.
  *
  * <p>Example 2: reading a PCollection of filenames.
  *
@@ -121,9 +125,9 @@ import org.apache.beam.sdk.values.PDone;
  * allows you to convert any input value into a custom destination object, and 
map that destination
  * object to a {@link FilenamePolicy}. This allows using different filename 
policies (or more
  * commonly, differently-configured instances of the same policy) based on the 
input record. Often
- * this is used in conjunction with {@link 
TextIO#writeCustomType(SerializableFunction)}, which
- * allows your {@link DynamicDestinations} object to examine the input type 
and takes a format
- * function to convert that type to a string for writing.
+ * this is used in conjunction with {@link TextIO#writeCustomType}, which 
allows your {@link
+ * DynamicDestinations} object to examine the input type and takes a format 
function to convert that
+ * type to a string for writing.
  *
  * <p>A convenience shortcut is provided for the case where the default naming 
policy is used, but
  * different configurations of this policy are wanted based on the input 
record. Default naming
@@ -189,20 +193,23 @@ public class TextIO {
    * line.
    *
    * <p>This version allows you to apply {@link TextIO} writes to a 
PCollection of a custom type
-   * {@link T}, along with a format function that converts the input type 
{@link T} to the String
-   * that will be written to the file. The advantage of this is it allows a 
user-provided {@link
+   * {@link UserT}. A format mechanism that converts the input type {@link 
UserT} to the String that
+   * will be written to the file must be specified. If using a custom {@link 
DynamicDestinations}
+   * object this is done using {@link DynamicDestinations#formatRecord}, 
otherwise the {@link
+   * TypedWrite#withFormatFunction} can be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a 
user-provided {@link
    * DynamicDestinations} object, set via {@link 
Write#to(DynamicDestinations)} to examine the
-   * user's custom type when choosing a destination.
+   * custom type when choosing a destination.
    */
-  public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, 
String> formatFunction) {
-    return new AutoValue_TextIO_TypedWrite.Builder<T>()
+  public static <UserT> TypedWrite<UserT> writeCustomType() {
+    return new AutoValue_TextIO_TypedWrite.Builder<UserT>()
         .setFilenamePrefix(null)
         .setTempDirectory(null)
         .setShardTemplate(null)
         .setFilenameSuffix(null)
         .setFilenamePolicy(null)
         .setDynamicDestinations(null)
-        .setFormatFunction(formatFunction)
         
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
         .setNumShards(0)
@@ -417,11 +424,11 @@ public class TextIO {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
+  // 
///////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class TypedWrite<T> extends 
PTransform<PCollection<T>, PDone> {
+  public abstract static class TypedWrite<UserT> extends 
PTransform<PCollection<UserT>, PDone> {
     /** The prefix of each file written, combined with suffix and 
shardTemplate. */
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
@@ -449,10 +456,19 @@ public class TextIO {
 
     /** Allows for value-dependent {@link DynamicDestinations} to be vended. */
     @Nullable
-    abstract DynamicDestinations<T, ?> getDynamicDestinations();
+    abstract DynamicDestinations<UserT, ?, String> getDynamicDestinations();
+
+    @Nullable
+    /** A destination function for using {@link DefaultFilenamePolicy} */
+    abstract SerializableFunction<UserT, Params> getDestinationFunction();
 
-    /** A function that converts T to a String, for writing to the file. */
-    abstract SerializableFunction<T, String> getFormatFunction();
+    @Nullable
+    /** A default destination for empty PCollections. */
+    abstract Params getEmptyDestination();
+
+    /** A function that converts UserT to a String, for writing to the file. */
+    @Nullable
+    abstract SerializableFunction<UserT, String> getFormatFunction();
 
     /** Whether to write windowed output files. */
     abstract boolean getWindowedWrites();
@@ -463,37 +479,42 @@ public class TextIO {
      */
     abstract WritableByteChannelFactory getWritableByteChannelFactory();
 
-    abstract Builder<T> toBuilder();
+    abstract Builder<UserT> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> 
filenamePrefix);
+    abstract static class Builder<UserT> {
+      abstract Builder<UserT> setFilenamePrefix(ValueProvider<ResourceId> 
filenamePrefix);
+
+      abstract Builder<UserT> setTempDirectory(ValueProvider<ResourceId> 
tempDirectory);
 
-      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> 
tempDirectory);
+      abstract Builder<UserT> setShardTemplate(@Nullable String shardTemplate);
 
-      abstract Builder<T> setShardTemplate(@Nullable String shardTemplate);
+      abstract Builder<UserT> setFilenameSuffix(@Nullable String 
filenameSuffix);
 
-      abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix);
+      abstract Builder<UserT> setHeader(@Nullable String header);
 
-      abstract Builder<T> setHeader(@Nullable String header);
+      abstract Builder<UserT> setFooter(@Nullable String footer);
 
-      abstract Builder<T> setFooter(@Nullable String footer);
+      abstract Builder<UserT> setFilenamePolicy(@Nullable FilenamePolicy 
filenamePolicy);
 
-      abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy 
filenamePolicy);
+      abstract Builder<UserT> setDynamicDestinations(
+          @Nullable DynamicDestinations<UserT, ?, String> dynamicDestinations);
 
-      abstract Builder<T> setDynamicDestinations(
-          @Nullable DynamicDestinations<T, ?> dynamicDestinations);
+      abstract Builder<UserT> setDestinationFunction(
+          @Nullable SerializableFunction<UserT, Params> destinationFunction);
 
-      abstract Builder<T> setFormatFunction(SerializableFunction<T, String> 
formatFunction);
+      abstract Builder<UserT> setEmptyDestination(Params emptyDestination);
 
-      abstract Builder<T> setNumShards(int numShards);
+      abstract Builder<UserT> setFormatFunction(SerializableFunction<UserT, 
String> formatFunction);
 
-      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+      abstract Builder<UserT> setNumShards(int numShards);
 
-      abstract Builder<T> setWritableByteChannelFactory(
+      abstract Builder<UserT> setWindowedWrites(boolean windowedWrites);
+
+      abstract Builder<UserT> setWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory);
 
-      abstract TypedWrite<T> build();
+      abstract TypedWrite<UserT> build();
     }
 
     /**
@@ -513,18 +534,18 @@ public class TextIO {
      * <p>If {@link #withTempDirectory} has not been called, this filename 
prefix will be used to
      * infer a directory for temporary files.
      */
-    public TypedWrite<T> to(String filenamePrefix) {
+    public TypedWrite<UserT> to(String filenamePrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
     }
 
     /** Like {@link #to(String)}. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> to(ResourceId filenamePrefix) {
+    public TypedWrite<UserT> to(ResourceId filenamePrefix) {
       return toResource(StaticValueProvider.of(filenamePrefix));
     }
 
     /** Like {@link #to(String)}. */
-    public TypedWrite<T> to(ValueProvider<String> outputPrefix) {
+    public TypedWrite<UserT> to(ValueProvider<String> outputPrefix) {
       return toResource(NestedValueProvider.of(outputPrefix,
           new SerializableFunction<String, ResourceId>() {
             @Override
@@ -538,7 +559,7 @@ public class TextIO {
      * Writes to files named according to the given {@link 
FileBasedSink.FilenamePolicy}. A
      * directory for temporary files must be specified using {@link 
#withTempDirectory}.
      */
-    public TypedWrite<T> to(FilenamePolicy filenamePolicy) {
+    public TypedWrite<UserT> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
@@ -547,7 +568,7 @@ public class TextIO {
      * objects can examine the input record when creating a {@link 
FilenamePolicy}. A directory for
      * temporary files must be specified using {@link #withTempDirectory}.
      */
-    public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
+    public TypedWrite<UserT> to(DynamicDestinations<UserT, ?, String> 
dynamicDestinations) {
       return toBuilder().setDynamicDestinations(dynamicDestinations).build();
     }
 
@@ -558,26 +579,39 @@ public class TextIO {
      * emptyDestination parameter specified where empty files should be 
written for when the written
      * {@link PCollection} is empty.
      */
-    public TypedWrite<T> to(
-        SerializableFunction<T, Params> destinationFunction, Params 
emptyDestination) {
-      return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, 
emptyDestination));
+    public TypedWrite<UserT> to(
+        SerializableFunction<UserT, Params> destinationFunction, Params 
emptyDestination) {
+      return toBuilder()
+          .setDestinationFunction(destinationFunction)
+          .setEmptyDestination(emptyDestination)
+          .build();
     }
 
     /** Like {@link #to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) {
+    public TypedWrite<UserT> toResource(ValueProvider<ResourceId> 
filenamePrefix) {
       return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
+    /**
+     * Specifies a format function to convert {@link UserT} to the output 
type. If {@link
+     * #to(DynamicDestinations)} is used, {@link 
DynamicDestinations#formatRecord(Object)} must be
+     * used instead.
+     */
+    public TypedWrite<UserT> withFormatFunction(
+        SerializableFunction<UserT, String> formatFunction) {
+      return toBuilder().setFormatFunction(formatFunction).build();
+    }
+
     /** Set the base directory used to generate temporary files. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> 
tempDirectory) {
+    public TypedWrite<UserT> withTempDirectory(ValueProvider<ResourceId> 
tempDirectory) {
       return toBuilder().setTempDirectory(tempDirectory).build();
     }
 
     /** Set the base directory used to generate temporary files. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) {
+    public TypedWrite<UserT> withTempDirectory(ResourceId tempDirectory) {
       return withTempDirectory(StaticValueProvider.of(tempDirectory));
     }
 
@@ -589,7 +623,7 @@ public class TextIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name 
template, and suffix are
      * used.
      */
-    public TypedWrite<T> withShardNameTemplate(String shardTemplate) {
+    public TypedWrite<UserT> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
@@ -601,7 +635,7 @@ public class TextIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name 
template, and suffix are
      * used.
      */
-    public TypedWrite<T> withSuffix(String filenameSuffix) {
+    public TypedWrite<UserT> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
@@ -615,7 +649,7 @@ public class TextIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system 
decide.
      */
-    public TypedWrite<T> withNumShards(int numShards) {
+    public TypedWrite<UserT> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
@@ -629,7 +663,7 @@ public class TextIO {
      *
      * <p>This is equivalent to {@code 
.withNumShards(1).withShardNameTemplate("")}
      */
-    public TypedWrite<T> withoutSharding() {
+    public TypedWrite<UserT> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
@@ -638,7 +672,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured header.
      */
-    public TypedWrite<T> withHeader(@Nullable String header) {
+    public TypedWrite<UserT> withHeader(@Nullable String header) {
       return toBuilder().setHeader(header).build();
     }
 
@@ -647,7 +681,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured footer.
      */
-    public TypedWrite<T> withFooter(@Nullable String footer) {
+    public TypedWrite<UserT> withFooter(@Nullable String footer) {
       return toBuilder().setFooter(footer).build();
     }
 
@@ -658,7 +692,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will reset the value to the default value 
mentioned above.
      */
-    public TypedWrite<T> withWritableByteChannelFactory(
+    public TypedWrite<UserT> withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
       return 
toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
     }
@@ -669,36 +703,58 @@ public class TextIO {
      * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will 
be generated using
      * {@link FilenamePolicy#windowedFilename}. See also {@link 
WriteFiles#withWindowedWrites()}.
      */
-    public TypedWrite<T> withWindowedWrites() {
+    public TypedWrite<UserT> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }
 
-    private DynamicDestinations<T, ?> resolveDynamicDestinations() {
-      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
+    private DynamicDestinations<UserT, ?, String> resolveDynamicDestinations() 
{
+      DynamicDestinations<UserT, ?, String> dynamicDestinations = 
getDynamicDestinations();
       if (dynamicDestinations == null) {
-        FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-        if (usedFilenamePolicy == null) {
-          usedFilenamePolicy =
-              DefaultFilenamePolicy.fromStandardParameters(
-                  getFilenamePrefix(),
-                  getShardTemplate(),
-                  getFilenameSuffix(),
-                  getWindowedWrites());
+        if (getDestinationFunction() != null) {
+          dynamicDestinations =
+              DynamicFileDestinations.toDefaultPolicies(
+                  getDestinationFunction(), getEmptyDestination(), 
getFormatFunction());
+        } else {
+          FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+          if (usedFilenamePolicy == null) {
+            usedFilenamePolicy =
+                DefaultFilenamePolicy.fromStandardParameters(
+                    getFilenamePrefix(),
+                    getShardTemplate(),
+                    getFilenameSuffix(),
+                    getWindowedWrites());
+          }
+          dynamicDestinations =
+              DynamicFileDestinations.constant(usedFilenamePolicy, 
getFormatFunction());
         }
-        dynamicDestinations = 
DynamicFileDestinations.constant(usedFilenamePolicy);
       }
       return dynamicDestinations;
     }
 
     @Override
-    public PDone expand(PCollection<T> input) {
+    public PDone expand(PCollection<UserT> input) {
       checkState(
           getFilenamePrefix() != null || getTempDirectory() != null,
           "Need to set either the filename prefix or the tempDirectory of a 
TextIO.Write "
               + "transform.");
-      checkState(
-          getFilenamePolicy() == null || getDynamicDestinations() == null,
-          "Cannot specify both a filename policy and dynamic destinations");
+
+      List<?> allToArgs =
+          Lists.newArrayList(
+              getFilenamePolicy(),
+              getDynamicDestinations(),
+              getFilenamePrefix(),
+              getDestinationFunction());
+      checkArgument(
+          1 == Iterables.size(Iterables.filter(allToArgs, 
Predicates.notNull())),
+          "Exactly one of filename policy, dynamic destinations, filename 
prefix, or destination "
+              + "function must be set");
+
+      if (getDynamicDestinations() != null) {
+        checkArgument(
+            getFormatFunction() == null,
+            "A format function should not be specified "
+                + "with DynamicDestinations. Use 
DynamicDestinations.formatRecord instead");
+      }
       if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
         checkState(
             getShardTemplate() == null && getFilenameSuffix() == null,
@@ -709,20 +765,20 @@ public class TextIO {
     }
 
     public <DestinationT> PDone expandTyped(
-        PCollection<T> input, DynamicDestinations<T, DestinationT> 
dynamicDestinations) {
+        PCollection<UserT> input,
+        DynamicDestinations<UserT, DestinationT, String> dynamicDestinations) {
       ValueProvider<ResourceId> tempDirectory = getTempDirectory();
       if (tempDirectory == null) {
         tempDirectory = getFilenamePrefix();
       }
-      WriteFiles<T, DestinationT, String> write =
+      WriteFiles<UserT, DestinationT, String> write =
           WriteFiles.to(
               new TextSink<>(
                   tempDirectory,
                   dynamicDestinations,
                   getHeader(),
                   getFooter(),
-                  getWritableByteChannelFactory()),
-              getFormatFunction());
+                  getWritableByteChannelFactory()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -774,7 +830,7 @@ public class TextIO {
     @VisibleForTesting TypedWrite<String> inner;
 
     Write() {
-      this(TextIO.writeCustomType(SerializableFunctions.<String>identity()));
+      this(TextIO.<String>writeCustomType());
     }
 
     Write(TypedWrite<String> inner) {
@@ -783,43 +839,53 @@ public class TextIO {
 
     /** See {@link TypedWrite#to(String)}. */
     public Write to(String filenamePrefix) {
-      return new Write(inner.to(filenamePrefix));
+      return new Write(
+          
inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write to(ResourceId filenamePrefix) {
-      return new Write(inner.to(filenamePrefix));
+      return new Write(
+          
inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(ValueProvider)}. */
     public Write to(ValueProvider<String> outputPrefix) {
-      return new Write(inner.to(outputPrefix));
+      return new Write(
+          
inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#toResource(ValueProvider)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
-      return new Write(inner.toResource(filenamePrefix));
+      return new Write(
+          inner
+              .toResource(filenamePrefix)
+              .withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(FilenamePolicy)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write to(FilenamePolicy filenamePolicy) {
-      return new Write(inner.to(filenamePolicy));
+      return new Write(
+          
inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(DynamicDestinations)}. */
     @Experimental(Kind.FILESYSTEM)
-    public Write to(DynamicDestinations<String, ?> dynamicDestinations) {
-      return new Write(inner.to(dynamicDestinations));
+    public Write to(DynamicDestinations<String, ?, String> 
dynamicDestinations) {
+      return new Write(inner.to(dynamicDestinations).withFormatFunction(null));
     }
 
     /** See {@link TypedWrite#to(SerializableFunction, Params)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write to(
         SerializableFunction<String, Params> destinationFunction, Params 
emptyDestination) {
-      return new Write(inner.to(destinationFunction, emptyDestination));
+      return new Write(
+          inner
+              .to(destinationFunction, emptyDestination)
+              .withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index b57b28c..387e0ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.util.MimeTypes;
  * '\n'} represented in {@code UTF-8} format as the record separator. Each 
record (including the
  * last) is terminated.
  */
-class TextSink<UserT, DestinationT> extends FileBasedSink<String, 
DestinationT> {
+class TextSink<UserT, DestinationT> extends FileBasedSink<UserT, DestinationT, 
String> {
   @Nullable private final String header;
   @Nullable private final String footer;
 
   TextSink(
       ValueProvider<ResourceId> baseOutputFilename,
-      DynamicDestinations<UserT, DestinationT> dynamicDestinations,
+      DynamicDestinations<UserT, DestinationT, String> dynamicDestinations,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
@@ -50,13 +50,13 @@ class TextSink<UserT, DestinationT> extends 
FileBasedSink<String, DestinationT>
   }
 
   @Override
-  public WriteOperation<String, DestinationT> createWriteOperation() {
+  public WriteOperation<DestinationT, String> createWriteOperation() {
     return new TextWriteOperation<>(this, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
   private static class TextWriteOperation<DestinationT>
-      extends WriteOperation<String, DestinationT> {
+      extends WriteOperation<DestinationT, String> {
     @Nullable private final String header;
     @Nullable private final String footer;
 
@@ -67,20 +67,20 @@ class TextSink<UserT, DestinationT> extends 
FileBasedSink<String, DestinationT>
     }
 
     @Override
-    public Writer<String, DestinationT> createWriter() throws Exception {
+    public Writer<DestinationT, String> createWriter() throws Exception {
       return new TextWriter<>(this, header, footer);
     }
   }
 
   /** A {@link Writer Writer} for text files. */
-  private static class TextWriter<DestinationT> extends Writer<String, 
DestinationT> {
+  private static class TextWriter<DestinationT> extends Writer<DestinationT, 
String> {
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
     private OutputStreamWriter out;
 
     public TextWriter(
-        WriteOperation<String, DestinationT> writeOperation,
+        WriteOperation<DestinationT, String> writeOperation,
         @Nullable String header,
         @Nullable String footer) {
       super(writeOperation, MimeTypes.TEXT);

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index d8d7478..85c5652 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -76,7 +75,9 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -121,9 +122,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
 
   static final int UNKNOWN_SHARDNUM = -1;
-  private FileBasedSink<OutputT, DestinationT> sink;
-  private SerializableFunction<UserT, OutputT> formatFunction;
-  private WriteOperation<OutputT, DestinationT> writeOperation;
+  private FileBasedSink<UserT, DestinationT, OutputT> sink;
+  private WriteOperation<DestinationT, OutputT> writeOperation;
   // This allows the number of shards to be dynamically computed based on the 
input
   // PCollection.
   @Nullable private final PTransform<PCollection<UserT>, 
PCollectionView<Integer>> computeNumShards;
@@ -133,37 +133,44 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private final ValueProvider<Integer> numShardsProvider;
   private final boolean windowedWrites;
   private int maxNumWritersPerBundle;
+  // This is the set of side inputs used by this transform. This is usually 
populated by the users's
+  // DynamicDestinations object.
+  private final List<PCollectionView<?>> sideInputs;
 
   /**
    * Creates a {@link WriteFiles} transform that writes to the given {@link 
FileBasedSink}, letting
    * the runner control how many different shards are produced.
    */
   public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, 
OutputT> to(
-      FileBasedSink<OutputT, DestinationT> sink,
-      SerializableFunction<UserT, OutputT> formatFunction) {
+      FileBasedSink<UserT, DestinationT, OutputT> sink) {
     checkNotNull(sink, "sink");
     return new WriteFiles<>(
         sink,
-        formatFunction,
         null /* runner-determined sharding */,
         null,
         false,
-        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
+        sink.getDynamicDestinations().getSideInputs());
   }
 
   private WriteFiles(
-      FileBasedSink<OutputT, DestinationT> sink,
-      SerializableFunction<UserT, OutputT> formatFunction,
+      FileBasedSink<UserT, DestinationT, OutputT> sink,
       @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> 
computeNumShards,
       @Nullable ValueProvider<Integer> numShardsProvider,
       boolean windowedWrites,
-      int maxNumWritersPerBundle) {
+      int maxNumWritersPerBundle,
+      List<PCollectionView<?>> sideInputs) {
     this.sink = sink;
-    this.formatFunction = checkNotNull(formatFunction);
     this.computeNumShards = computeNumShards;
     this.numShardsProvider = numShardsProvider;
     this.windowedWrites = windowedWrites;
     this.maxNumWritersPerBundle = maxNumWritersPerBundle;
+    this.sideInputs = sideInputs;
+  }
+
+  @Override
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return PCollectionViews.toAdditionalInputs(sideInputs);
   }
 
   @Override
@@ -207,15 +214,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   }
 
   /** Returns the {@link FileBasedSink} associated with this PTransform. */
-  public FileBasedSink<OutputT, DestinationT> getSink() {
+  public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
     return sink;
   }
 
-  /** Returns the the format function that maps the user type to the record 
written to files. */
-  public SerializableFunction<UserT, OutputT> getFormatFunction() {
-    return formatFunction;
-  }
-
   /**
    * Returns whether or not to perform windowed writes.
    */
@@ -266,11 +268,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       ValueProvider<Integer> numShardsProvider) {
     return new WriteFiles<>(
         sink,
-        formatFunction,
         computeNumShards,
         numShardsProvider,
         windowedWrites,
-        maxNumWritersPerBundle);
+        maxNumWritersPerBundle,
+        sideInputs);
   }
 
   /** Set the maximum number of writers created in a bundle before spilling to 
shuffle. */
@@ -278,11 +280,22 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       int maxNumWritersPerBundle) {
     return new WriteFiles<>(
         sink,
-        formatFunction,
         computeNumShards,
         numShardsProvider,
         windowedWrites,
-        maxNumWritersPerBundle);
+        maxNumWritersPerBundle,
+        sideInputs);
+  }
+
+  public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(
+      List<PCollectionView<?>> sideInputs) {
+    return new WriteFiles<>(
+        sink,
+        computeNumShards,
+        numShardsProvider,
+        windowedWrites,
+        maxNumWritersPerBundle,
+        sideInputs);
   }
 
   /**
@@ -297,7 +310,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     checkNotNull(
         sharding, "Cannot provide null sharding. Use 
withRunnerDeterminedSharding() instead");
     return new WriteFiles<>(
-        sink, formatFunction, sharding, null, windowedWrites, 
maxNumWritersPerBundle);
+        sink, sharding, null, windowedWrites, maxNumWritersPerBundle, 
sideInputs);
   }
 
   /**
@@ -305,8 +318,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * runner-determined sharding.
    */
   public WriteFiles<UserT, DestinationT, OutputT> 
withRunnerDeterminedSharding() {
-    return new WriteFiles<>(
-        sink, formatFunction, null, null, windowedWrites, 
maxNumWritersPerBundle);
+    return new WriteFiles<>(sink, null, null, windowedWrites, 
maxNumWritersPerBundle, sideInputs);
   }
 
   /**
@@ -323,7 +335,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    */
   public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
     return new WriteFiles<>(
-        sink, formatFunction, computeNumShards, numShardsProvider, true, 
maxNumWritersPerBundle);
+        sink, computeNumShards, numShardsProvider, true, 
maxNumWritersPerBundle, sideInputs);
   }
 
   private static class WriterKey<DestinationT> {
@@ -374,7 +386,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     private final Coder<DestinationT> destinationCoder;
     private final boolean windowedWrites;
 
-    private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> 
writers;
+    private Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> 
writers;
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
     WriteBundles(
@@ -394,6 +406,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
+      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
       PaneInfo paneInfo = c.pane();
       // If we are doing windowed writes, we need to ensure that we have 
separate files for
       // data in different windows/panes. Similar for dynamic writes, make 
sure that different
@@ -402,7 +415,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // the map will only have a single element.
       DestinationT destination = 
sink.getDynamicDestinations().getDestination(c.element());
       WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), 
destination);
-      Writer<OutputT, DestinationT> writer = writers.get(key);
+      Writer<DestinationT, OutputT> writer = writers.get(key);
       if (writer == null) {
         if (writers.size() <= maxNumWritersPerBundle) {
           String uuid = UUID.randomUUID().toString();
@@ -436,14 +449,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           return;
         }
       }
-      writeOrClose(writer, formatFunction.apply(c.element()));
+      writeOrClose(writer, 
getSink().getDynamicDestinations().formatRecord(c.element()));
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> 
entry :
+      for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> 
entry :
           writers.entrySet()) {
-        Writer<OutputT, DestinationT> writer = entry.getValue();
+        Writer<DestinationT, OutputT> writer = entry.getValue();
         FileResult<DestinationT> result;
         try {
           result = writer.close();
@@ -478,13 +491,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
+      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
       // Since we key by a 32-bit hash of the destination, there might be 
multiple destinations
       // in this iterable. The number of destinations is generally very small 
(1000s or less), so
       // there will rarely be hash collisions.
-      Map<DestinationT, Writer<OutputT, DestinationT>> writers = 
Maps.newHashMap();
+      Map<DestinationT, Writer<DestinationT, OutputT>> writers = 
Maps.newHashMap();
       for (UserT input : c.element().getValue()) {
         DestinationT destination = 
sink.getDynamicDestinations().getDestination(input);
-        Writer<OutputT, DestinationT> writer = writers.get(destination);
+        Writer<DestinationT, OutputT> writer = writers.get(destination);
         if (writer == null) {
           LOG.debug("Opening writer for write operation {}", writeOperation);
           writer = writeOperation.createWriter();
@@ -501,12 +515,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           LOG.debug("Done opening writer");
           writers.put(destination, writer);
         }
-        writeOrClose(writer, formatFunction.apply(input));
-        }
+        writeOrClose(writer, 
getSink().getDynamicDestinations().formatRecord(input));
+      }
 
       // Close all writers.
-      for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : 
writers.entrySet()) {
-        Writer<OutputT, DestinationT> writer = entry.getValue();
+      for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : 
writers.entrySet()) {
+        Writer<DestinationT, OutputT> writer = entry.getValue();
         FileResult<DestinationT> result;
         try {
           // Close the writer; if this throws let the error propagate.
@@ -526,8 +540,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private static <OutputT, DestinationT> void writeOrClose(
-      Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
+  private static <DestinationT, OutputT> void writeOrClose(
+      Writer<DestinationT, OutputT> writer, OutputT t) throws Exception {
     try {
       writer.write(t);
     } catch (Exception e) {
@@ -678,6 +692,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           input.apply(
               writeName,
               ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, 
destinationCoder))
+                  .withSideInputs(sideInputs)
                   .withOutputTags(writtenRecordsTag, 
TupleTagList.of(unwrittedRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
           writeTuple
@@ -692,17 +707,18 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, 
UserT>create())
               .apply(
                   "WriteUnwritten",
-                  ParDo.of(new 
WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+                  ParDo.of(new 
WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))
+                      .withSideInputs(sideInputs))
               .setCoder(FileResultCoder.of(shardedWindowCoder, 
destinationCoder));
       results =
           PCollectionList.of(writtenBundleFiles)
               .and(writtenGroupedFiles)
               .apply(Flatten.<FileResult<DestinationT>>pCollections());
     } else {
-      List<PCollectionView<?>> sideInputs = Lists.newArrayList();
+      List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList();
       if (computeNumShards != null) {
         numShardsView = input.apply(computeNumShards);
-        sideInputs.add(numShardsView);
+        shardingSideInputs.add(numShardsView);
       } else {
         numShardsView = null;
       }
@@ -715,7 +731,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                               numShardsView,
                               (numShardsView != null) ? null : 
numShardsProvider,
                               destinationCoder))
-                      .withSideInputs(sideInputs))
+                      .withSideInputs(shardingSideInputs))
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), 
input.getCoder()))
               .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, 
UserT>create());
       shardedWindowCoder =
@@ -728,7 +744,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       results =
           sharded.apply(
               "WriteShardedBundles",
-              ParDo.of(new 
WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+              ParDo.of(new 
WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))
+                  .withSideInputs(sideInputs));
     }
     results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
 
@@ -773,11 +790,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     } else {
       final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
           results.apply(View.<FileResult<DestinationT>>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
+      ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs =
           ImmutableList.<PCollectionView<?>>builder().add(resultsView);
       if (numShardsView != null) {
-        sideInputs.add(numShardsView);
+        finalizeSideInputs.add(numShardsView);
       }
+      finalizeSideInputs.addAll(sideInputs);
 
       // Finalize the write in another do-once ParDo on the singleton 
collection containing the
       // Writer. The results from the per-bundle writes are given as an 
Iterable side input.
@@ -794,7 +812,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   new DoFn<Void, Integer>() {
                     @ProcessElement
                     public void processElement(ProcessContext c) throws 
Exception {
-                      LOG.info("Finalizing write operation {}.", 
writeOperation);
+                      
sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
                       // We must always output at least 1 shard, and honor 
user-specified numShards
                       // if
                       // set.
@@ -827,7 +845,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                       writeOperation.removeTemporaryFiles(tempFiles);
                     }
                   })
-              .withSideInputs(sideInputs.build()));
+              .withSideInputs(finalizeSideInputs.build()));
     }
     return PDone.in(input.getPipeline());
   }
@@ -857,7 +875,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           minShardsNeeded,
           destination);
       for (int i = 0; i < extraShardsNeeded; ++i) {
-        Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+        Writer<DestinationT, OutputT> writer = writeOperation.createWriter();
         // Currently this code path is only called in the unwindowed case.
         writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, 
destination);
         FileResult<DestinationT> emptyWrite = writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 154ff5a..a96b6be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -30,9 +30,11 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -41,6 +43,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
@@ -48,6 +51,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
@@ -55,6 +59,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -68,6 +73,7 @@ import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -77,6 +83,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -535,17 +542,147 @@ public class AvroIOTest {
     assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
   }
 
+  private static final String SCHEMA_TEMPLATE_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"TestTemplateSchema$$\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"$$full\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n"
+          + " ]\n"
+          + "}";
+
+  private static String schemaFromPrefix(String prefix) {
+    return SCHEMA_TEMPLATE_STRING.replace("$$", prefix);
+  }
+
+  private static GenericRecord createRecord(String record, String prefix, 
Schema schema) {
+    GenericRecord genericRecord = new GenericData.Record(schema);
+    genericRecord.put(prefix + "full", record);
+    genericRecord.put(prefix + "suffix", record.substring(1));
+    return genericRecord;
+  }
+
+  private static class TestDynamicDestinations
+      extends DynamicAvroDestinations<String, String, GenericRecord> {
+    ResourceId baseDir;
+    PCollectionView<Map<String, String>> schemaView;
+
+    TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, 
String>> schemaView) {
+      this.baseDir = baseDir;
+      this.schemaView = schemaView;
+    }
+
+    @Override
+    public Schema getSchema(String destination) {
+      // Return a per-destination schema.
+      String schema = sideInput(schemaView).get(destination);
+      return new Schema.Parser().parse(schema);
+    }
+
+    @Override
+    public List<PCollectionView<?>> getSideInputs() {
+      return ImmutableList.<PCollectionView<?>>of(schemaView);
+    }
+
+    @Override
+    public GenericRecord formatRecord(String record) {
+      String prefix = record.substring(0, 1);
+      return createRecord(record, prefix, getSchema(prefix));
+    }
+
+    @Override
+    public String getDestination(String element) {
+      // Destination is based on first character of string.
+      return element.substring(0, 1);
+    }
+
+    @Override
+    public String getDefaultDestination() {
+      return "";
+    }
+
+    @Override
+    public FilenamePolicy getFilenamePolicy(String destination) {
+      return DefaultFilenamePolicy.fromStandardParameters(
+          StaticValueProvider.of(
+              baseDir.resolve("file_" + destination + ".txt", 
StandardResolveOptions.RESOLVE_FILE)),
+          null,
+          null,
+          false);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinations() throws Exception {
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tmpFolder.getRoot().toPath(), 
"testDynamicDestinations")
+                .toString(),
+            true);
+
+    List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", 
"caaa", "caab");
+    List<GenericRecord> expectedElements = 
Lists.newArrayListWithExpectedSize(elements.size());
+    Map<String, String> schemaMap = Maps.newHashMap();
+    for (String element : elements) {
+      String prefix = element.substring(0, 1);
+      String jsonSchema = schemaFromPrefix(prefix);
+      schemaMap.put(prefix, jsonSchema);
+      expectedElements.add(createRecord(element, prefix, new 
Schema.Parser().parse(jsonSchema)));
+    }
+    PCollectionView<Map<String, String>> schemaView =
+        writePipeline
+            .apply("createSchemaView", Create.of(schemaMap))
+            .apply(View.<String, String>asMap());
+
+    PCollection<String> input =
+        writePipeline.apply("createInput", 
Create.of(elements).withCoder(StringUtf8Coder.of()));
+    input.apply(
+        AvroIO.<String>writeCustomTypeToGenericRecords()
+            .to(new TestDynamicDestinations(baseDir, schemaView))
+            .withoutSharding()
+            .withTempDirectory(baseDir));
+    writePipeline.run();
+
+    // Validate that the data written matches the expected elements in the 
expected order.
+
+    List<String> prefixes = Lists.newArrayList();
+    for (String element : elements) {
+      prefixes.add(element.substring(0, 1));
+    }
+    prefixes = ImmutableSet.copyOf(prefixes).asList();
+
+    List<GenericRecord> actualElements = new ArrayList<>();
+    for (String prefix : prefixes) {
+      File expectedFile =
+          new File(
+              baseDir
+                  .resolve(
+                      "file_" + prefix + ".txt-00000-of-00001", 
StandardResolveOptions.RESOLVE_FILE)
+                  .toString());
+      assertTrue("Expected output file " + expectedFile.getAbsolutePath(), 
expectedFile.exists());
+      Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix));
+      try (DataFileReader<GenericRecord> reader =
+          new DataFileReader<>(expectedFile, new 
GenericDatumReader<GenericRecord>(schema))) {
+        Iterators.addAll(actualElements, reader);
+      }
+      expectedFile.delete();
+    }
+    assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+  }
+
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
     AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz");
-    assertEquals(CodecFactory.deflateCodec(6).toString(), 
write.getCodec().toString());
+    assertEquals(CodecFactory.deflateCodec(6).toString(), 
write.inner.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
     AvroIO.Write<String> write =
         
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
-    assertEquals(SNAPPY_CODEC, write.getCodec().toString());
+    assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString());
   }
 
   @Test
@@ -556,7 +693,7 @@ public class AvroIOTest {
 
     assertEquals(
         CodecFactory.deflateCodec(9).toString(),
-        SerializableUtils.clone(write.getCodec()).getCodec().toString());
+        SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -567,7 +704,7 @@ public class AvroIOTest {
 
     assertEquals(
         CodecFactory.xzCodec(9).toString(),
-        SerializableUtils.clone(write.getCodec()).getCodec().toString());
+        SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -618,7 +755,8 @@ public class AvroIOTest {
 
     String shardNameTemplate =
         firstNonNull(
-            write.getShardTemplate(), 
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+            write.inner.getShardTemplate(),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, 
shardNameTemplate);
   }
@@ -710,7 +848,13 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
     assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
     assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
-    assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            "schema",
+            
"{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io"
+                + 
".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},"
+                + "{\"name\":\"stringField\",\"type\":\"string\"}]}"));
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("codec", 
CodecFactory.snappyCodec().toString()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index a6ad746..ff30e33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -231,7 +231,7 @@ public class FileBasedSinkTest {
         SimpleSink.makeSimpleSink(
             getBaseOutputDirectory(), prefix, "", "", 
CompressionType.UNCOMPRESSED);
 
-    WriteOperation<String, Void> writeOp =
+    WriteOperation<Void, String> writeOp =
         new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
@@ -482,11 +482,11 @@ public class FileBasedSinkTest {
   public void testFileBasedWriterWithWritableByteChannelFactory() throws 
Exception {
     final String testUid = "testId";
     ResourceId root = getBaseOutputDirectory();
-    WriteOperation<String, Void> writeOp =
+    WriteOperation<Void, String> writeOp =
         SimpleSink.makeSimpleSink(
                 root, "file", "-SS-of-NN", "txt", new 
DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final Writer<String, Void> writer = writeOp.createWriter();
+    final Writer<Void, String> writer = writeOp.createWriter();
     final ResourceId expectedFile =
         writeOp.tempDirectory.get().resolve(testUid, 
StandardResolveOptions.RESOLVE_FILE);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 9196178..382898d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -28,10 +28,10 @@ import org.apache.beam.sdk.util.MimeTypes;
 /**
  * A simple {@link FileBasedSink} that writes {@link String} values as lines 
with header and footer.
  */
-class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
+class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, 
String> {
   public SimpleSink(
       ResourceId tempDirectory,
-      DynamicDestinations<String, DestinationT> dynamicDestinations,
+      DynamicDestinations<String, DestinationT, String> dynamicDestinations,
       WritableByteChannelFactory writableByteChannelFactory) {
     super(StaticValueProvider.of(tempDirectory), dynamicDestinations, 
writableByteChannelFactory);
   }
@@ -50,7 +50,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, 
DestinationT> {
       String shardTemplate,
       String suffix,
       WritableByteChannelFactory writableByteChannelFactory) {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -67,7 +67,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, 
DestinationT> {
   }
 
   static final class SimpleWriteOperation<DestinationT>
-      extends WriteOperation<String, DestinationT> {
+      extends WriteOperation<DestinationT, String> {
     public SimpleWriteOperation(SimpleSink sink, ResourceId 
tempOutputDirectory) {
       super(sink, tempOutputDirectory);
     }
@@ -82,7 +82,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, 
DestinationT> {
     }
   }
 
-  static final class SimpleWriter<DestinationT> extends Writer<String, 
DestinationT> {
+  static final class SimpleWriter<DestinationT> extends Writer<DestinationT, 
String> {
     static final String HEADER = "header";
     static final String FOOTER = "footer";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index a73ed7d..7f80c26 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -110,7 +110,8 @@ public class TextIOWriteTest {
         });
   }
 
-  static class TestDynamicDestinations extends 
FileBasedSink.DynamicDestinations<String, String> {
+  static class TestDynamicDestinations
+      extends FileBasedSink.DynamicDestinations<String, String, String> {
     ResourceId baseDir;
 
     TestDynamicDestinations(ResourceId baseDir) {
@@ -118,6 +119,11 @@ public class TextIOWriteTest {
     }
 
     @Override
+    public String formatRecord(String record) {
+      return record;
+    }
+
+    @Override
     public String getDestination(String element) {
       // Destination is based on first character of string.
       return element.substring(0, 1);
@@ -169,10 +175,7 @@ public class TextIOWriteTest {
 
     List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", 
"caaa", "caab");
     PCollection<String> input = 
p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
-    input.apply(
-        TextIO.write()
-            .to(new TestDynamicDestinations(baseDir))
-            
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+    input.apply(TextIO.write().to(new 
TestDynamicDestinations(baseDir)).withTempDirectory(baseDir));
     p.run();
 
     assertOutputFiles(
@@ -268,8 +271,14 @@ public class TextIOWriteTest {
             new UserWriteType("caab", "sixth"));
     PCollection<UserWriteType> input = p.apply(Create.of(elements));
     input.apply(
-        TextIO.writeCustomType(new SerializeUserWrite())
-            .to(new UserWriteDestination(baseDir), new 
DefaultFilenamePolicy.Params())
+        TextIO.<UserWriteType>writeCustomType()
+            .to(
+                new UserWriteDestination(baseDir),
+                new DefaultFilenamePolicy.Params()
+                    .withBaseFilename(
+                        baseDir.resolve(
+                            "empty", 
ResolveOptions.StandardResolveOptions.RESOLVE_FILE)))
+            .withFormatFunction(new SerializeUserWrite())
             
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
     p.run();
 

Reply via email to