Add windowing support to FileBasedSink

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6addc95f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6addc95f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6addc95f

Branch: refs/heads/master
Commit: 6addc95f0300a2e03109d4ad7ee93727d0a3b7b2
Parents: 570d0e2
Author: Reuven Lax <[email protected]>
Authored: Thu Mar 9 09:45:35 2017 -0800
Committer: Kenneth Knowles <[email protected]>
Committed: Wed Apr 5 08:57:21 2017 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |   1 -
 .../apache/beam/examples/WindowedWordCount.java |  34 +-
 .../examples/common/WriteOneFilePerWindow.java  |  91 ++++
 .../examples/common/WriteWindowedFilesDoFn.java |  77 ----
 .../beam/examples/WindowedWordCountIT.java      |  41 +-
 .../core/construction/PTransformMatchers.java   |   3 +-
 .../direct/WriteWithShardingFactory.java        |   6 +-
 .../streaming/io/UnboundedFlinkSink.java        |  20 +-
 .../beam/runners/flink/WriteSinkITCase.java     |  23 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 157 +++++--
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 429 +++++++++++++++----
 .../main/java/org/apache/beam/sdk/io/Sink.java  |  55 ++-
 .../java/org/apache/beam/sdk/io/TextIO.java     |  98 ++++-
 .../main/java/org/apache/beam/sdk/io/Write.java | 377 +++++++++-------
 .../java/org/apache/beam/sdk/io/XmlSink.java    |   6 +-
 .../beam/sdk/testing/TestPipelineOptions.java   |   5 +
 .../beam/sdk/util/FileIOChannelFactory.java     |  23 +-
 .../beam/sdk/util/GcsIOChannelFactory.java      |   3 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  21 +-
 .../apache/beam/sdk/util/IOChannelFactory.java  |   3 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 146 ++++++-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  94 ++--
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  49 ++-
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |   8 +-
 .../beam/sdk/testing/TestPipelineTest.java      |  17 -
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   |  24 +-
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      |   2 +-
 27 files changed, 1295 insertions(+), 518 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 2b18130..021a819 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -209,7 +209,6 @@
                 <configuration>
                   <includes>
                     <include>WordCountIT.java</include>
-                    <include>WindowedWordCountIT.java</include>
                   </includes>
                   <parallel>all</parallel>
                   <threadCount>4</threadCount>

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java 
b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 5c19454..d88de54 100644
--- 
a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExampleOptions;
-import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.examples.common.WriteOneFilePerWindow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.TextIO;
@@ -31,11 +31,9 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -203,33 +201,13 @@ public class WindowedWordCount {
     PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new 
WordCount.CountWords());
 
     /**
-     * Concept #5: Customize the output format using windowing information
-     *
-     * <p>At this point, the data is organized by window. We're writing text 
files and and have no
-     * late data, so for simplicity we can use the window as the key and 
{@link GroupByKey} to get
-     * one output file per window. (if we had late data this key would not be 
unique)
-     *
-     * <p>To access the window in a {@link DoFn}, add a {@link BoundedWindow} 
parameter. This will
-     * be automatically detected and populated with the window for the current 
element.
-     */
-    PCollection<KV<IntervalWindow, KV<String, Long>>> keyedByWindow =
-        wordCounts.apply(
-            ParDo.of(
-                new DoFn<KV<String, Long>, KV<IntervalWindow, KV<String, 
Long>>>() {
-                  @ProcessElement
-                  public void processElement(ProcessContext context, 
IntervalWindow window) {
-                    context.output(KV.of(window, context.element()));
-                  }
-                }));
-
-    /**
-     * Concept #6: Format the results and write to a sharded file partitioned 
by window, using a
+     * Concept #5: Format the results and write to a sharded file partitioned 
by window, using a
      * simple ParDo operation. Because there may be failures followed by 
retries, the
      * writes must be idempotent, but the details of writing to files is 
elided here.
      */
-    keyedByWindow
-        .apply(GroupByKey.<IntervalWindow, KV<String, Long>>create())
-        .apply(ParDo.of(new WriteWindowedFilesDoFn(output)));
+    wordCounts
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+        .apply(new WriteOneFilePerWindow(output));
 
     PipelineResult result = pipeline.run();
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
new file mode 100644
index 0000000..2ed8a74
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.common;
+
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+
+/**
+ * A {@link DoFn} that writes elements to files with names deterministically 
derived from the lower
+ * and upper bounds of their key (an {@link IntervalWindow}).
+ *
+ * <p>This is test utility code, not for end-users, so examples can be focused 
on their primary
+ * lessons.
+ */
+public class WriteOneFilePerWindow extends PTransform<PCollection<String>, 
PDone> {
+
+  private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
+  private String filenamePrefix;
+
+  public WriteOneFilePerWindow(String filenamePrefix) {
+    this.filenamePrefix = filenamePrefix;
+  }
+
+  @Override
+  public PDone expand(PCollection<String> input) {
+    return input.apply(
+        TextIO.Write.to(new 
PerWindowFiles(filenamePrefix)).withWindowedWrites().withNumShards(3));
+  }
+
+  /**
+   * A {@link FilenamePolicy} produces a base file name for a write based on 
metadata about the data
+   * being written. This always includes the shard number and the total number 
of shards. For
+   * windowed writes, it also includes the window and pane index (a sequence 
number assigned to each
+   * trigger firing).
+   */
+  public static class PerWindowFiles extends FilenamePolicy {
+
+    private final String output;
+
+    public PerWindowFiles(String output) {
+      this.output = output;
+    }
+
+    @Override
+    public ValueProvider<String> getBaseOutputFilenameProvider() {
+      return StaticValueProvider.of(output);
+    }
+
+    public String   filenamePrefixForWindow(IntervalWindow window) {
+      return String.format(
+          "%s-%s-%s", output, formatter.print(window.start()), 
formatter.print(window.end()));
+    }
+
+    @Override
+    public String windowedFilename(WindowedContext context) {
+      IntervalWindow window = (IntervalWindow) context.getWindow();
+      return String.format(
+          "%s-%s-of-%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), 
context.getNumShards());
+    }
+
+    @Override
+    public String unwindowedFilename(Context context) {
+      throw new UnsupportedOperationException("Unsupported.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
 
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
deleted file mode 100644
index cd6baad..0000000
--- 
a/examples/java/src/main/java/org/apache/beam/examples/common/WriteWindowedFilesDoFn.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.examples.common;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.charset.StandardCharsets;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.KV;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-/**
- * A {@link DoFn} that writes elements to files with names deterministically 
derived from the lower
- * and upper bounds of their key (an {@link IntervalWindow}).
- *
- * <p>This is test utility code, not for end-users, so examples can be focused
- * on their primary lessons.
- */
-public class WriteWindowedFilesDoFn
-    extends DoFn<KV<IntervalWindow, Iterable<KV<String, Long>>>, Void> {
-
-  static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-  static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-
-  private static DateTimeFormatter formatter = ISODateTimeFormat.hourMinute();
-
-  private final String output;
-
-  public WriteWindowedFilesDoFn(String output) {
-    this.output = output;
-  }
-
-  @VisibleForTesting
-  public static String fileForWindow(String output, IntervalWindow window) {
-    return String.format(
-        "%s-%s-%s", output, formatter.print(window.start()), 
formatter.print(window.end()));
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext context) throws Exception {
-    // Build a file name from the window
-    IntervalWindow window = context.element().getKey();
-    String outputShard = fileForWindow(output, window);
-
-    // Open the file and write all the values
-    IOChannelFactory factory = IOChannelUtils.getFactory(outputShard);
-    OutputStream out = Channels.newOutputStream(factory.create(outputShard, 
"text/plain"));
-    for (KV<String, Long> wordCount : context.element().getValue()) {
-      STRING_CODER.encode(
-          wordCount.getKey() + ": " + wordCount.getValue(), out, 
Coder.Context.OUTER);
-      out.write(NEWLINE);
-    }
-    out.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 703f836..857f1d3 100644
--- 
a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -23,13 +23,14 @@ import com.google.api.client.util.Sleeper;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.examples.common.WriteWindowedFilesDoFn;
+import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
@@ -42,6 +43,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.ExplicitShardedFile;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.NumberedShardedFile;
 import org.apache.beam.sdk.util.ShardedFile;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
@@ -64,7 +66,7 @@ public class WindowedWordCountIT {
   @Rule public TestName testName = new TestName();
 
   private static final String DEFAULT_INPUT =
-      "gs://apache-beam-samples/shakespeare/winterstale-personae";
+      "gs://apache-beam-samples/shakespeare/sonnets.txt";
   static final int MAX_READ_RETRIES = 4;
   static final Duration DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L);
   static final FluentBackoff BACK_OFF_FACTORY =
@@ -130,14 +132,18 @@ public class WindowedWordCountIT {
 
     String outputPrefix = options.getOutput();
 
-    List<String> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+    PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
+
+    List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
+
     for (int startMinute : ImmutableList.of(0, 10, 20, 30, 40, 50)) {
-      Instant windowStart =
+      final Instant windowStart =
           new 
Instant(options.getMinTimestampMillis()).plus(Duration.standardMinutes(startMinute));
       expectedOutputFiles.add(
-          WriteWindowedFilesDoFn.fileForWindow(
-              outputPrefix,
-              new IntervalWindow(windowStart, 
windowStart.plus(Duration.standardMinutes(10)))));
+          new NumberedShardedFile(
+              filenamePolicy.filenamePrefixForWindow(
+                  new IntervalWindow(
+                      windowStart, 
windowStart.plus(Duration.standardMinutes(10)))) + "*"));
     }
 
     ShardedFile inputFile = new 
ExplicitShardedFile(Collections.singleton(options.getInputFile()));
@@ -157,7 +163,7 @@ public class WindowedWordCountIT {
     }
 
     options.setOnSuccessMatcher(
-        new WordCountsMatcher(expectedWordCounts, new 
ExplicitShardedFile(expectedOutputFiles)));
+        new WordCountsMatcher(expectedWordCounts, expectedOutputFiles));
 
     WindowedWordCount.main(TestPipeline.convertToArgs(options));
   }
@@ -172,24 +178,28 @@ public class WindowedWordCountIT {
     private static final Logger LOG = 
LoggerFactory.getLogger(FileChecksumMatcher.class);
 
     private final SortedMap<String, Long> expectedWordCounts;
-    private final ShardedFile outputFile;
+    private final List<ShardedFile> outputFiles;
     private SortedMap<String, Long> actualCounts;
 
-    public WordCountsMatcher(SortedMap<String, Long> expectedWordCounts, 
ShardedFile outputFile) {
+    public WordCountsMatcher(
+        SortedMap<String, Long> expectedWordCounts, List<ShardedFile> 
outputFiles) {
       this.expectedWordCounts = expectedWordCounts;
-      this.outputFile = outputFile;
+      this.outputFiles = outputFiles;
     }
 
     @Override
     public boolean matchesSafely(PipelineResult pipelineResult) {
       try {
         // Load output data
-        List<String> lines =
-            outputFile.readFilesWithRetries(Sleeper.DEFAULT, 
BACK_OFF_FACTORY.backoff());
+        List<String> outputLines = new ArrayList<>();
+        for (ShardedFile outputFile : outputFiles) {
+          outputLines.addAll(
+              outputFile.readFilesWithRetries(Sleeper.DEFAULT, 
BACK_OFF_FACTORY.backoff()));
+        }
 
         // Since the windowing is nondeterministic we only check the sums
         actualCounts = new TreeMap<>();
-        for (String line : lines) {
+        for (String line : outputLines) {
           String[] splits = line.split(": ");
           String word = splits[0];
           long count = Long.parseLong(splits[1]);
@@ -205,7 +215,8 @@ public class WindowedWordCountIT {
         return actualCounts.equals(expectedWordCounts);
       } catch (Exception e) {
         throw new RuntimeException(
-            String.format("Failed to read from sharded output: %s", 
outputFile));
+            String.format("Failed to read from sharded output: %s due to 
exception",
+                outputFiles), e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index f4ae577..c4f1bd6 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -254,7 +254,8 @@ public class PTransformMatchers {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
         if (application.getTransform() instanceof Write) {
-          return ((Write) application.getTransform()).getSharding() == null;
+          Write write = (Write) application.getTransform();
+          return write.getSharding() == null && write.getNumShards() == null;
         }
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index 63122fe..1bf5839 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -35,6 +35,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
@@ -54,8 +56,7 @@ class WriteWithShardingFactory<InputT>
   @Override
   public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
       Write<InputT> transform) {
-
-      return transform.withSharding(new LogElementShardsWithDrift<InputT>());
+    return transform.withSharding(new LogElementShardsWithDrift<InputT>());
   }
 
   @Override
@@ -74,6 +75,7 @@ class WriteWithShardingFactory<InputT>
     @Override
     public PCollectionView<Integer> expand(PCollection<T> records) {
       return records
+          .apply(Window.<T>into(new GlobalWindows()))
           .apply("CountRecords", Count.<T>globally())
           .apply("GenerateShardCount", ParDo.of(new CalculateShardsFn()))
           .apply(View.<Integer>asSingleton());

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 301d841..af36b80 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -63,6 +65,10 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
       }
 
       @Override
+      public void setWindowedWrites(boolean windowedWrites) {
+      }
+
+      @Override
       public void finalize(Iterable<Object> writerResults, PipelineOptions 
options)
           throws Exception {
 
@@ -141,7 +147,19 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
       public Writer<T, Object> createWriter(PipelineOptions options) throws 
Exception {
         return new Writer<T, Object>() {
           @Override
-          public void open(String uId) throws Exception {
+          public void openWindowed(String uId,
+                                   BoundedWindow window,
+                                   PaneInfo paneInfo,
+                                   int shard,
+                                   int numShards) throws Exception {
+          }
+
+          @Override
+          public void openUnwindowed(String uId, int shard, int numShards) 
throws Exception {
+          }
+
+          @Override
+          public void cleanup() throws Exception {
 
           }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 572c291..38b790e 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -33,6 +33,8 @@ import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -119,6 +121,11 @@ public class WriteSinkITCase extends JavaProgramTestBase {
       }
 
       @Override
+      public void setWindowedWrites(boolean windowedWrites) {
+
+      }
+
+      @Override
       public void finalize(Iterable<String> writerResults, PipelineOptions 
options)
           throws Exception {
 
@@ -142,13 +149,27 @@ public class WriteSinkITCase extends JavaProgramTestBase {
         private PrintWriter internalWriter;
 
         @Override
-        public void open(String uId) throws Exception {
+        public final void openWindowed(String uId,
+                                       BoundedWindow window,
+                                       PaneInfo paneInfo,
+                                       int shard,
+                                       int numShards) throws Exception {
+          throw new UnsupportedOperationException("Windowed writes not 
supported.");
+        }
+
+        @Override
+        public final void openUnwindowed(String uId, int shard, int numShards) 
throws Exception {
           Path path = new Path(resultPath + "/" + uId);
           FileSystem.get(new URI("file:///")).create(path, false);
           internalWriter = new PrintWriter(new File(path.toUri()));
         }
 
         @Override
+        public void cleanup() throws Exception {
+
+        }
+
+        @Override
         public void write(String value) throws Exception {
           internalWriter.println(value);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 96f0a50..a41c9f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
+
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
@@ -39,6 +40,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -85,11 +87,21 @@ import org.apache.beam.sdk.values.PDone;
  * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to} to specify
+ * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify
  * the path of the file to write to (e.g., a local filename or sharded
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * {@code "gs://<bucket>/<filepath>"}). {@link 
AvroIO.Write#to(FilenamePolicy)} can also be used
+ * to specify a custom file naming policy.
+ *
+ * <p>By default, all input is put into the global window before writing. If 
per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and 
triggering to be
+ * preserved. When producing windowed writes, the number of output shards must 
be set explicitly
+ * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set 
this for you to a
+ * runner-chosen value, so you may need not set it yourself. A
+ * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and 
triggers must produce
+ * unique filenames.
  *
  * <p>It is required to specify {@link AvroIO.Write#withSchema}. To
  * write specific records, such as Avro-generated classes, provide an
@@ -369,6 +381,14 @@ public class AvroIO {
     }
 
     /**
+     * Returns a {@link PTransform} that writes to the file(s) specified by 
the provided
+     * {@link FileBasedSink.FilenamePolicy}.
+     */
+    public static Bound<GenericRecord> to(FilenamePolicy filenamePolicy) {
+      return new Bound<>(GenericRecord.class).to(filenamePolicy);
+    }
+
+    /**
      * Returns a {@link PTransform} that writes to the file(s) with the
      * given filename suffix.
      */
@@ -496,6 +516,9 @@ public class AvroIO {
       final Schema schema;
       /** An option to indicate if output validation is desired. Default is 
true. */
       final boolean validate;
+      final boolean windowedWrites;
+      FilenamePolicy filenamePolicy;
+
       /**
        * The codec used to encode the blocks in the Avro file. String value 
drawn from those in
        * 
https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -515,7 +538,9 @@ public class AvroIO {
             null,
             true,
             DEFAULT_CODEC,
-            ImmutableMap.<String, Object>of());
+            ImmutableMap.<String, Object>of(),
+            false,
+            null);
       }
 
       Bound(
@@ -528,7 +553,9 @@ public class AvroIO {
           Schema schema,
           boolean validate,
           SerializableAvroCodecFactory codec,
-          Map<String, Object> metadata) {
+          Map<String, Object> metadata,
+          boolean windowedWrites,
+          FilenamePolicy filenamePolicy) {
         super(name);
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
@@ -538,6 +565,8 @@ public class AvroIO {
         this.schema = schema;
         this.validate = validate;
         this.codec = codec;
+        this.windowedWrites = windowedWrites;
+        this.filenamePolicy = filenamePolicy;
 
         Map<String, String> badKeys = Maps.newLinkedHashMap();
         for (Map.Entry<String, Object> entry : metadata.entrySet()) {
@@ -573,7 +602,25 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
+      }
+
+      public Bound<T> to(FilenamePolicy filenamePolicy) {
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec,
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -596,7 +643,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -625,7 +674,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -647,7 +698,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -670,7 +723,25 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
+      }
+
+      public Bound<T> withWindowedWrites() {
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            schema,
+            validate,
+            codec,
+            metadata,
+            true,
+            filenamePolicy);
       }
 
       /**
@@ -693,7 +764,9 @@ public class AvroIO {
             ReflectData.get().getSchema(type),
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -714,7 +787,9 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -749,7 +824,9 @@ public class AvroIO {
             schema,
             false,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -769,7 +846,9 @@ public class AvroIO {
             schema,
             validate,
             new SerializableAvroCodecFactory(codec),
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       /**
@@ -789,31 +868,49 @@ public class AvroIO {
             schema,
             validate,
             codec,
-            metadata);
+            metadata,
+            windowedWrites,
+            filenamePolicy);
       }
 
       @Override
       public PDone expand(PCollection<T> input) {
-        if (filenamePrefix == null) {
+        if (filenamePolicy == null && filenamePrefix == null) {
           throw new IllegalStateException(
               "need to set the filename prefix of an AvroIO.Write transform");
         }
+        if (filenamePolicy != null && filenamePrefix != null) {
+          throw new IllegalStateException(
+              "cannot set both a filename policy and a filename prefix");
+        }
         if (schema == null) {
           throw new IllegalStateException("need to set the schema of an 
AvroIO.Write transform");
         }
 
-        org.apache.beam.sdk.io.Write<T> write =
-            org.apache.beam.sdk.io.Write.to(
-                new AvroSink<>(
-                    filenamePrefix,
-                    filenameSuffix,
-                    shardTemplate,
-                    AvroCoder.of(type, schema),
-                    codec,
-                    metadata));
+        org.apache.beam.sdk.io.Write<T> write = null;
+        if (filenamePolicy != null) {
+          write = org.apache.beam.sdk.io.Write.to(
+              new AvroSink<>(
+                  filenamePolicy,
+                  AvroCoder.of(type, schema),
+                  codec,
+                  metadata));
+        } else {
+          write = org.apache.beam.sdk.io.Write.to(
+              new AvroSink<>(
+                  filenamePrefix,
+                  filenameSuffix,
+                  shardTemplate,
+                  AvroCoder.of(type, schema),
+                  codec,
+                  metadata));
+        }
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
+        if (windowedWrites) {
+          write = write.withWindowedWrites();
+        }
         return input.apply("Write", write);
       }
 
@@ -940,6 +1037,18 @@ public class AvroIO {
 
     @VisibleForTesting
     AvroSink(
+        FilenamePolicy filenamePolicy,
+        AvroCoder<T> coder,
+        SerializableAvroCodecFactory codec,
+        ImmutableMap<String, Object> metadata) {
+      super(filenamePolicy);
+      this.coder = coder;
+      this.codec = codec;
+      this.metadata = metadata;
+    }
+
+    @VisibleForTesting
+    AvroSink(
         String baseOutputFilename,
         String extension,
         String fileNameTemplate,

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index ae28b62..9b5f130 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -17,34 +17,48 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 
 import javax.annotation.Nullable;
 
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -146,21 +160,165 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    */
   protected final WritableByteChannelFactory writableByteChannelFactory;
 
+
   /**
-   * Base filename for final output files.
+   * A naming policy for output files.
    */
-  protected final ValueProvider<String> baseOutputFilename;
+  public abstract static class FilenamePolicy implements Serializable {
+    /**
+     * Context used for generating a name based on shard numer, and num shards.
+     * The policy must produce unique filenames for unique {@link Context} 
objects.
+     *
+     * <p>Be careful about adding fields to this as existing strategies will 
not notice the new
+     * fields, and may not produce unique filenames.
+     */
+    public static class Context {
+      private int shardNumber;
+      private int numShards;
+
+
+      public Context(int shardNumber, int numShards) {
+        this.shardNumber = shardNumber;
+        this.numShards = numShards;
+      }
+
+      public int getShardNumber() {
+        return shardNumber;
+      }
+
+
+      public int getNumShards() {
+        return numShards;
+      }
+    }
+
+    /**
+     * Context used for generating a name based on window, pane, shard numer, 
and num shards.
+     * The policy must produce unique filenames for unique {@link 
WindowedContext} objects.
+     *
+     * <p>Be careful about adding fields to this as existing strategies will 
not notice the new
+     * fields, and may not produce unique filenames.
+     */
+    public static class WindowedContext {
+      private int shardNumber;
+      private int numShards;
+      private BoundedWindow window;
+      private PaneInfo paneInfo;
+
+      public WindowedContext(
+          BoundedWindow window,
+          PaneInfo paneInfo,
+          int shardNumber,
+          int numShards) {
+        this.window = window;
+        this.paneInfo = paneInfo;
+        this.shardNumber = shardNumber;
+        this.numShards = numShards;
+      }
+
+      public BoundedWindow getWindow() {
+        return window;
+      }
+
+      public PaneInfo getPaneInfo() {
+        return paneInfo;
+      }
+
+      public int getShardNumber() {
+        return shardNumber;
+      }
+
+      public int getNumShards() {
+        return numShards;
+      }
+    }
+
+    /**
+     * When a sink has requested windowed or triggered output, this method 
will be invoked to return
+     * the filename. The {@link WindowedContext} object gives access to the 
window and pane, as
+     * well as sharding information. The policy must return unique and 
consistent filenames
+     * for different windows and panes.
+     */
+    public abstract String windowedFilename(WindowedContext c);
+
+    /**
+     * When a sink has not requested windowed output, this method will be 
invoked to return the
+     * filename. The {@link Context} object only provides sharding 
information, which is used by
+     * the policy to generate unique and consistent filenames.
+     */
+    public abstract String unwindowedFilename(Context c);
+
+    /**
+     * @return The base filename for all output files.
+     */
+    public abstract ValueProvider<String> getBaseOutputFilenameProvider();
+
+    /**
+     * Populates the display data.
+     */
+    public void populateDisplayData(DisplayData.Builder builder) {
+    }
+  }
 
   /**
-   * The extension to be used for the final output files.
+   * A default filename policy.
    */
-  protected final String extension;
+  protected class DefaultFilenamePolicy extends FilenamePolicy {
+    ValueProvider<String> baseOutputFilename;
+    String extension;
+    String fileNamingTemplate;
+
+    public DefaultFilenamePolicy(ValueProvider<String> baseOutputFilename, 
String extension,
+                                 String fileNamingTemplate) {
+      this.baseOutputFilename = baseOutputFilename;
+      if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
+        this.extension = extension + getFileExtension(
+            writableByteChannelFactory.getFilenameSuffix());
+      } else {
+        this.extension = extension;
+      }
+      this.fileNamingTemplate = fileNamingTemplate;
+    }
+
+    @Override
+    public String unwindowedFilename(FilenamePolicy.Context context) {
+      if (context.numShards <= 0) {
+        return null;
+      }
+
+      String suffix = getFileExtension(extension);
+      String filename = IOChannelUtils.constructName(
+          baseOutputFilename.get(), fileNamingTemplate, suffix, 
context.getShardNumber(),
+          context.getNumShards());
+      return filename;
+    }
+
+    @Override
+    public String windowedFilename(FilenamePolicy.WindowedContext c) {
+      throw new UnsupportedOperationException("There is no default policy for 
windowed file"
+          + " output. Please provide an explicit FilenamePolicy to generate 
filenames.");
+    }
+
+    @Override
+    public ValueProvider<String> getBaseOutputFilenameProvider() {
+      return baseOutputFilename;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+    String fileNamePattern = String.format("%s%s%s",
+        baseOutputFilename.isAccessible()
+        ? baseOutputFilename.get() : baseOutputFilename.toString(),
+        fileNamingTemplate, getFileExtension(extension));
+    builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
+      .withLabel("File Name Pattern"));
+    }
+  }
 
   /**
-   * Naming template for output files. See {@link ShardNameTemplate} for a 
description of
-   * possible naming templates.  Default is {@link 
ShardNameTemplate#INDEX_OF_MAX}.
+   * The policy used to generate output filenames.
    */
-  protected final String fileNamingTemplate;
+  protected FilenamePolicy fileNamePolicy;
 
   /**
    * Construct a FileBasedSink with the given base output filename and 
extension. A
@@ -201,20 +359,30 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   public FileBasedSink(ValueProvider<String> baseOutputFilename, String 
extension,
       String fileNamingTemplate, WritableByteChannelFactory 
writableByteChannelFactory) {
     this.writableByteChannelFactory = writableByteChannelFactory;
-    this.baseOutputFilename = baseOutputFilename;
-    if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
-      this.extension = extension + 
getFileExtension(writableByteChannelFactory.getFilenameSuffix());
-    } else {
-      this.extension = extension;
-    }
-    this.fileNamingTemplate = fileNamingTemplate;
+    this.fileNamePolicy = new DefaultFilenamePolicy(baseOutputFilename, 
extension,
+        fileNamingTemplate);
+  }
+
+  public FileBasedSink(FilenamePolicy fileNamePolicy) {
+    this(fileNamePolicy, CompressionType.UNCOMPRESSED);
+
+  }
+
+  public FileBasedSink(FilenamePolicy fileNamePolicy,
+                       WritableByteChannelFactory writableByteChannelFactory) {
+    this.fileNamePolicy = fileNamePolicy;
+    this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
   /**
    * Returns the base output filename for this file based sink.
    */
   public ValueProvider<String> getBaseOutputFilenameProvider() {
-    return baseOutputFilename;
+    return fileNamePolicy.getBaseOutputFilenameProvider();
+  }
+
+  public FilenamePolicy getFileNamePolicy() {
+    return fileNamePolicy;
   }
 
   @Override
@@ -230,13 +398,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-
-    String fileNamePattern = String.format("%s%s%s",
-        baseOutputFilename.isAccessible()
-        ? baseOutputFilename.get() : baseOutputFilename.toString(),
-        fileNamingTemplate, getFileExtension(extension));
-    builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
-      .withLabel("File Name Pattern"));
+    getFileNamePolicy().populateDisplayData(builder);
   }
 
   /**
@@ -286,7 +448,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * constructor arguments.
    *
    * <p>Subclass implementations can change the file naming template by 
supplying a value for
-   * {@link FileBasedSink#fileNamingTemplate}.
+   * fileNamingTemplate.
    *
    * <p>Note that in the case of permanent failure of a bundle's write, no 
clean up of temporary
    * files will occur.
@@ -304,6 +466,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     /** Directory for temporary output files. */
     protected final ValueProvider<String> tempDirectory;
 
+    /** Whether windowed writes are being used. */
+    protected  boolean windowedWrites;
+
     /** Constructs a temporary file path given the temporary directory and a 
filename. */
     protected static String buildTemporaryFilename(String tempDirectory, 
String filename)
         throws IOException {
@@ -361,6 +526,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     private FileBasedWriteOperation(FileBasedSink<T> sink, 
ValueProvider<String> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
+      this.windowedWrites = false;
     }
 
     /**
@@ -371,6 +537,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     @Override
     public abstract FileBasedWriter<T> createWriter(PipelineOptions options) 
throws Exception;
 
+    @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+      this.windowedWrites = windowedWrites;
+    }
+
     /**
      * Initialization of the sink. Default implementation is a no-op. May be 
overridden by subclass
      * implementations to perform initialization of the sink at pipeline 
runtime. This method must
@@ -395,22 +566,55 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * @param writerResults the results of writes (FileResult).
      */
     @Override
-    public void finalize(Iterable<FileResult> writerResults, PipelineOptions 
options)
+    public void finalize(Iterable<FileResult> writerResults,
+                         PipelineOptions options)
         throws Exception {
       // Collect names of temporary files and rename them.
-      List<String> files = new ArrayList<>();
-      for (FileResult result : writerResults) {
-        LOG.debug("Temporary bundle output file {} will be copied.", 
result.getFilename());
-        files.add(result.getFilename());
-      }
-      copyToOutputFiles(files, options);
+      Map<String, String> outputFilenames = 
buildOutputFilenames(writerResults);
+      copyToOutputFiles(outputFilenames, options);
 
+      // Optionally remove temporary files.
       // We remove the entire temporary directory, rather than specifically 
removing the files
       // from writerResults, because writerResults includes only successfully 
completed bundles,
       // and we'd like to clean up the failed ones too.
       // Note that due to GCS eventual consistency, matching files in the temp 
directory is also
       // currently non-perfect and may fail to delete some files.
-      removeTemporaryFiles(files, options);
+      //
+      // When windows or triggers are specified, files are generated 
incrementally so deleting
+      // the entire directory in finalize is incorrect.
+      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites, options);
+    }
+
+    protected final Map<String, String> 
buildOutputFilenames(Iterable<FileResult> writerResults) {
+      Map<String, String> outputFilenames = new HashMap<>();
+      List<String> files = new ArrayList<>();
+      for (FileResult result : writerResults) {
+        if (result.getDestinationFilename() != null) {
+          outputFilenames.put(result.getFilename(), 
result.getDestinationFilename());
+        } else {
+          files.add(result.getFilename());
+        }
+      }
+
+      // If the user does not specify numShards() (not supported with 
windowing). Then the
+      // writerResults won't contain destination filenames, so we dynamically 
generate them here.
+      if (files.size() > 0) {
+        checkArgument(outputFilenames.isEmpty());
+        // Sort files for idempotence.
+        files = Ordering.natural().sortedCopy(files);
+        FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
+        for (int i = 0; i < files.size(); i++) {
+          outputFilenames.put(files.get(i),
+              filenamePolicy.unwindowedFilename(new Context(i, files.size())));
+        }
+      }
+
+      int numDistinctShards = new 
HashSet<String>(outputFilenames.values()).size();
+      checkState(numDistinctShards == outputFilenames.size(),
+         "Only generated %s distinct file names for %s files.",
+         numDistinctShards, outputFilenames.size());
+
+      return outputFilenames;
     }
 
     /**
@@ -425,47 +629,19 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * file-000-of-003.txt, the contents of B will be copied to 
file-001-of-003.txt, etc.
      *
      * @param filenames the filenames of temporary files.
-     * @return a list containing the names of final output files.
      */
-    protected final List<String> copyToOutputFiles(List<String> filenames, 
PipelineOptions options)
+    protected final void copyToOutputFiles(Map<String, String> filenames,
+                                           PipelineOptions options)
         throws IOException {
       int numFiles = filenames.size();
-      // Sort files for idempotence.
-      List<String> srcFilenames = Ordering.natural().sortedCopy(filenames);
-      List<String> destFilenames = generateDestinationFilenames(numFiles);
-
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
-        IOChannelUtils.getFactory(destFilenames.get(0))
-            .copy(srcFilenames, destFilenames);
+        IOChannelFactory channelFactory =
+            IOChannelUtils.getFactory(filenames.values().iterator().next());
+        channelFactory.copy(filenames.keySet(), filenames.values());
       } else {
         LOG.info("No output files to write.");
       }
-
-      return destFilenames;
-    }
-
-    /**
-     * Generate output bundle filenames.
-     */
-    protected final List<String> generateDestinationFilenames(int numFiles) {
-      List<String> destFilenames = new ArrayList<>();
-      String extension = getSink().extension;
-      String baseOutputFilename = getSink().baseOutputFilename.get();
-      String fileNamingTemplate = getSink().fileNamingTemplate;
-
-      String suffix = getFileExtension(extension);
-      for (int i = 0; i < numFiles; i++) {
-        destFilenames.add(IOChannelUtils.constructName(
-            baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
-      }
-
-      int numDistinctShards = new HashSet<String>(destFilenames).size();
-      checkState(numDistinctShards == numFiles,
-          "Shard name template '%s' only generated %s distinct file names for 
%s files.",
-          fileNamingTemplate, numDistinctShards, numFiles);
-
-      return destFilenames;
     }
 
     /**
@@ -475,7 +651,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or 
otherwise finalize
      * temporary files, this method will remove them.
      */
-    protected final void removeTemporaryFiles(List<String> knownFiles, 
PipelineOptions options)
+    protected final void removeTemporaryFiles(Set<String> knownFiles,
+                                              boolean 
shouldRemoveTemporaryDirectory,
+                                              PipelineOptions options)
         throws IOException {
       String tempDir = tempDirectory.get();
       LOG.debug("Removing temporary bundle output files in {}.", tempDir);
@@ -485,15 +663,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       // directory matching APIs, we remove not only files that the filesystem 
says exist
       // in the directory (which may be incomplete), but also files that are 
known to exist
       // (produced by successfully completed bundles).
+
       // This may still fail to remove temporary outputs of some failed 
bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully 
addressed.
       Set<String> matches = new HashSet<>();
       // TODO: Windows OS cannot resolves and matches '*' in the path,
       // ignore the exception for now to avoid failing the pipeline.
-      try {
-        matches.addAll(factory.match(factory.resolve(tempDir, "*")));
-      } catch (Exception e) {
-        LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+      if (shouldRemoveTemporaryDirectory) {
+        try {
+          matches.addAll(factory.match(factory.resolve(tempDir, "*")));
+        } catch (Exception e) {
+          LOG.warn("Failed to match temporary files under: [{}].", tempDir);
+        }
       }
       Set<String> allMatches = new HashSet<>(matches);
       allMatches.addAll(knownFiles);
@@ -517,7 +698,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      */
     @Override
     public Coder<FileResult> getWriterResultCoder() {
-      return SerializableCoder.of(FileResult.class);
+      return FileResultCoder.of();
     }
 
     /**
@@ -553,8 +734,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      */
     private String id;
 
+    private BoundedWindow window;
+    private PaneInfo paneInfo;
+    private int shard = -1;
+    private int numShards = -1;
+
     /**
-     * The filename of the output bundle - $tempDirectory/$id.
+     * The filename of the output bundle.
      */
     private String filename;
 
@@ -610,8 +796,37 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * Opens the channel.
      */
     @Override
-    public final void open(String uId) throws Exception {
+    public final void openWindowed(String uId,
+                                   BoundedWindow window,
+                                   PaneInfo paneInfo,
+                                   int shard,
+                                   int numShards) throws Exception {
+      if (!getWriteOperation().windowedWrites) {
+        throw new IllegalStateException("openWindowed called a non-windowed 
sink.");
+      }
+      open(uId, window, paneInfo, shard, numShards);
+    }
+
+    @Override
+    public final void openUnwindowed(String uId,
+                                     int shard,
+                                     int numShards) throws Exception {
+      if (getWriteOperation().windowedWrites) {
+        throw new IllegalStateException("openUnwindowed called a windowed 
sink.");
+      }
+      open(uId, null, null, shard, numShards);
+    }
+
+    private void open(String uId,
+                      @Nullable BoundedWindow window,
+                      @Nullable PaneInfo paneInfo,
+                      int shard,
+                      int numShards) throws Exception {
       this.id = uId;
+      this.window = window;
+      this.paneInfo = paneInfo;
+      this.shard = shard;
+      this.numShards = numShards;
       filename = FileBasedWriteOperation.buildTemporaryFilename(
           getWriteOperation().tempDirectory.get(), uId);
       LOG.debug("Opening {}.", filename);
@@ -639,6 +854,13 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
     }
 
+    @Override
+    public void cleanup() throws Exception {
+      if (filename != null) {
+        
IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
+      }
+    }
+
     /**
      * Closes the channel and returns the bundle result.
      */
@@ -653,8 +875,17 @@ public abstract class FileBasedSink<T> extends Sink<T> {
           throw new IllegalStateException("Channel should only be closed by 
its owner: " + channel);
         }
       }
-      FileResult result = new FileResult(filename);
-      LOG.debug("Result for bundle {}: {}", this.id, filename);
+
+      FilenamePolicy filenamePolicy = 
getWriteOperation().getSink().fileNamePolicy;
+      String destinationFile;
+      if (window != null) {
+        destinationFile = filenamePolicy.windowedFilename(new WindowedContext(
+            window, paneInfo, shard, numShards));
+      } else {
+        destinationFile =  filenamePolicy.unwindowedFilename(new 
Context(shard, numShards));
+      }
+      FileResult result = new FileResult(filename, destinationFile);
+      LOG.debug("Result for bundle {}: {} {}", this.id, filename, 
destinationFile);
       return result;
     }
 
@@ -670,18 +901,62 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   /**
    * Result of a single bundle write. Contains the filename of the bundle.
    */
-  public static final class FileResult implements Serializable {
+  public static final class FileResult {
     private final String filename;
+    private final String destinationFilename;
 
-    public FileResult(String filename) {
+    public FileResult(String filename, String destinationFilename) {
       this.filename = filename;
+      this.destinationFilename = destinationFilename;
     }
 
     public String getFilename() {
       return filename;
     }
+
+    public String getDestinationFilename() {
+      return destinationFilename;
+    }
+
+  }
+
+  /**
+   * A coder for FileResult objects.
+   */
+  public static final class FileResultCoder extends AtomicCoder<FileResult> {
+    private static final FileResultCoder INSTANCE = new FileResultCoder();
+    private final Coder<String> stringCoder = 
NullableCoder.of(StringUtf8Coder.of());
+
+    @JsonCreator
+    public static FileResultCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(FileResult value, OutputStream outStream, Context 
context)
+        throws IOException {
+      if (value == null) {
+        throw new CoderException("cannot encode a null value");
+      }
+      stringCoder.encode(value.getFilename(), outStream, context.nested());
+      stringCoder.encode(value.getDestinationFilename(), outStream, 
context.nested());
+    }
+
+    @Override
+    public FileResult decode(InputStream inStream, Context context)
+        throws IOException {
+      return new FileResult(
+          stringCoder.decode(inStream, context.nested()),
+          stringCoder.decode(inStream, context.nested()));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      throw new NonDeterministicException(this, "TableRows are not 
deterministic.");
+    }
   }
 
+
   /**
    * Implementations create instances of {@link WritableByteChannel} used by 
{@link FileBasedSink}
    * and related classes to allow <em>decorating</em>, or otherwise 
transforming, the raw data that

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
index 6742784..d53c6ce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
@@ -23,6 +23,8 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
@@ -63,11 +65,12 @@ import org.apache.beam.sdk.values.PCollection;
  * operation corresponds to. See below for more information about these 
methods and restrictions on
  * their implementation.
  *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines 
four methods:
- * {@link Writer#open}, which is called once at the start of writing a bundle; 
{@link Writer#write},
- * which writes a single record from the bundle; {@link Writer#close}, which 
is called once at the
- * end of writing a bundle; and {@link Writer#getWriteOperation}, which 
returns the write operation
- * that the writer belongs to.
+ * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines 
several methods:
+ * {@link Writer#openWindowed} and {@link Writer#openUnwindowed}, which are 
called once at the
+ * start of writing a bundle, depending on whether windowed or unwindowed 
output is requested.
+ * {@link Writer#write}, which writes a single record from the bundle; {@link 
Writer#close},
+ * which is called once at the end of writing a bundle; and {@link 
Writer#getWriteOperation},
+ * which returns the write operation that the writer belongs to.
  * </ul>
  *
  * <h2>WriteOperation</h2>
@@ -95,9 +98,10 @@ import org.apache.beam.sdk.values.PCollection;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple 
times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these 
executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link 
Writer#open} is passed
- * a unique <i>bundle id</i> when it is called by the Write transform, so even 
redundant or retried
- * bundles will have a unique way of identifying their output.
+ * result passed to the WriteOperation's finalize method. Each call to {@link 
Writer#openWindowed}
+ * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when 
it is called by the
+ * Write transform, so even redundant or retried bundles will have a unique 
way of identifying
+ * their output.
  *
  * <p>The bundle id should be used to guarantee that a bundle's output is 
unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, 
the name of the file
@@ -174,6 +178,11 @@ public abstract class Sink<T> implements Serializable, 
HasDisplayData {
     public abstract void initialize(PipelineOptions options) throws Exception;
 
     /**
+     * Indicates that the operation will be performing windowed writes.
+     */
+    public abstract void setWindowedWrites(boolean windowedWrites);
+
+    /**
      * Given an Iterable of results from bundle writes, performs finalization 
after writing and
      * closes the sink. Called after all bundle writes are complete.
      *
@@ -200,7 +209,7 @@ public abstract class Sink<T> implements Serializable, 
HasDisplayData {
      * Creates a new {@link Sink.Writer} to write a bundle of the input to the 
sink.
      *
      * <p>The bundle id that the writer will use to uniquely identify its 
output will be passed to
-     * {@link Writer#open}.
+     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
      *
      * <p>Must not mutate the state of the WriteOperation.
      */
@@ -218,9 +227,10 @@ public abstract class Sink<T> implements Serializable, 
HasDisplayData {
   }
 
   /**
-   * A Writer writes a bundle of elements from a PCollection to a sink. {@link 
Writer#open} is
-   * called before writing begins and {@link Writer#close} is called after all 
elements in the
-   * bundle have been written. {@link Writer#write} writes an element to the 
sink.
+   * A Writer writes a bundle of elements from a PCollection to a sink.
+   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called 
before writing begins
+   * and {@link Writer#close} is called after all elements in the bundle have 
been written.
+   * {@link Writer#write} writes an element to the sink.
    *
    * <p>Note that any access to static members or methods of a Writer must be 
thread-safe, as
    * multiple instances of a Writer may be instantiated in different threads 
on the same worker.
@@ -238,8 +248,25 @@ public abstract class Sink<T> implements Serializable, 
HasDisplayData {
      * <p>The unique id that is given to open should be used to ensure that 
the writer's output does
      * not interfere with the output of other Writers, as a bundle may be 
executed many times for
      * fault tolerance. See {@link Sink} for more information about bundle ids.
+     *
+     * <p></p>The window and paneInfo arguments are populated when windowed 
writes are requested.
+     * shard and numbShards are populated for the case of static sharding. In 
cases where the
+     * runner is dynamically picking sharding, shard and numShards might both 
be set to -1.
+     */
+    public abstract void openWindowed(String uId,
+                                      BoundedWindow window,
+                                      PaneInfo paneInfo,
+                                      int shard,
+                                      int numShards) throws Exception;
+
+    /**
+     * Perform bundle initialization for the case where the file is written 
unwindowed.
      */
-    public abstract void open(String uId) throws Exception;
+    public abstract void openUnwindowed(String uId,
+                                        int shard,
+                                        int numShards) throws Exception;
+
+    public abstract void cleanup() throws Exception;
 
     /**
      * Called for each value in the bundle.
@@ -262,5 +289,7 @@ public abstract class Sink<T> implements Serializable, 
HasDisplayData {
      * Returns the write operation this writer belongs to.
      */
     public abstract WriteOperation<T, WriteT> getWriteOperation();
+
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 58b55a9..ea80639 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -85,6 +86,14 @@ import org.apache.beam.sdk.values.PDone;
  * filename or sharded filename pattern of the form
  * {@code "gs://<bucket>/<filepath>"}).
  *
+ * <p>By default, all input is put into the global window before writing. If 
per-window writes are
+ * desired - for example, when using a streaming runner -
+ * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and 
triggering to be
+ * preserved. When producing windowed writes, the number of output shards must 
be set explicitly
+ * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set 
this for you to a
+ * runner-chosen value, so you may need not set it yourself. A {@link 
FilenamePolicy} must be
+ * set, and unique windows and triggers must produce unique filenames.
+ *
  * <p>Any existing files with the same names as generated output files
  * will be overwritten.
  *
@@ -352,6 +361,10 @@ public class TextIO {
       return new Bound().to(prefix);
     }
 
+    public static Bound to(FilenamePolicy filenamePolicy) {
+      return new Bound().to(filenamePolicy);
+
+    }
     /**
      * Like {@link #to(String)}, but with a {@link ValueProvider}.
      */
@@ -479,6 +492,12 @@ public class TextIO {
       /** An option to indicate if output validation is desired. Default is 
true. */
       private final boolean validate;
 
+      /** A policy for naming output files. */
+      private final FilenamePolicy filenamePolicy;
+
+      /** Whether to write windowed output files. */
+      private boolean windowedWrites;
+
       /**
        * The {@link WritableByteChannelFactory} to be used by the {@link 
FileBasedSink}. Default is
        * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
@@ -487,13 +506,15 @@ public class TextIO {
 
       private Bound() {
         this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE, true,
-            FileBasedSink.CompressionType.UNCOMPRESSED);
+            FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
       }
 
       private Bound(String name, ValueProvider<String> filenamePrefix, String 
filenameSuffix,
           @Nullable String header, @Nullable String footer, int numShards,
           String shardTemplate, boolean validate,
-          WritableByteChannelFactory writableByteChannelFactory) {
+          WritableByteChannelFactory writableByteChannelFactory,
+          FilenamePolicy filenamePolicy,
+          boolean windowedWrites) {
         super(name);
         this.header = header;
         this.footer = footer;
@@ -504,6 +525,8 @@ public class TextIO {
         this.validate = validate;
         this.writableByteChannelFactory =
             firstNonNull(writableByteChannelFactory, 
FileBasedSink.CompressionType.UNCOMPRESSED);
+        this.filenamePolicy = filenamePolicy;
+        this.windowedWrites = windowedWrites;
       }
 
       /**
@@ -518,7 +541,7 @@ public class TextIO {
         validateOutputComponent(filenamePrefix);
         return new Bound(name, StaticValueProvider.of(filenamePrefix), 
filenameSuffix,
             header, footer, numShards, shardTemplate, validate,
-            writableByteChannelFactory);
+            writableByteChannelFactory, filenamePolicy, windowedWrites);
       }
 
       /**
@@ -526,7 +549,15 @@ public class TextIO {
        */
       public Bound to(ValueProvider<String> filenamePrefix) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
+      }
+
+       /**
+        * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
+        */
+      public Bound to(FilenamePolicy filenamePolicy) {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
       }
 
       /**
@@ -540,7 +571,7 @@ public class TextIO {
       public Bound withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
         return new Bound(name, filenamePrefix, nameExtension, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
       }
 
       /**
@@ -560,7 +591,7 @@ public class TextIO {
       public Bound withNumShards(int numShards) {
         checkArgument(numShards >= 0);
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
       }
 
       /**
@@ -573,7 +604,7 @@ public class TextIO {
        */
       public Bound withShardNameTemplate(String shardTemplate) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
       }
 
       /**
@@ -591,7 +622,7 @@ public class TextIO {
        */
       public Bound withoutSharding() {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
1, "",
-            validate, writableByteChannelFactory);
+            validate, writableByteChannelFactory, filenamePolicy, 
windowedWrites);
       }
 
       /**
@@ -606,7 +637,7 @@ public class TextIO {
        */
       public Bound withoutValidation() {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, false, writableByteChannelFactory);
+            shardTemplate, false, writableByteChannelFactory, filenamePolicy, 
windowedWrites);
       }
 
       /**
@@ -621,7 +652,7 @@ public class TextIO {
        */
       public Bound withHeader(@Nullable String header) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
       }
 
       /**
@@ -636,7 +667,7 @@ public class TextIO {
        */
       public Bound withFooter(@Nullable String footer) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
       }
 
       /**
@@ -653,22 +684,39 @@ public class TextIO {
       public Bound withWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory) {
         return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
-            shardTemplate, validate, writableByteChannelFactory);
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, windowedWrites);
+      }
+
+      public Bound withWindowedWrites() {
+        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 
numShards,
+            shardTemplate, validate, writableByteChannelFactory, 
filenamePolicy, true);
       }
 
       @Override
       public PDone expand(PCollection<String> input) {
-        if (filenamePrefix == null) {
+        if (filenamePolicy == null && filenamePrefix == null) {
+          throw new IllegalStateException(
+              "need to set the filename prefix of an TextIO.Write transform");
+        }
+        if (filenamePolicy != null && filenamePrefix != null) {
           throw new IllegalStateException(
-              "need to set the filename prefix of a TextIO.Write transform");
+              "cannot set both a filename policy and a filename prefix");
+        }
+        org.apache.beam.sdk.io.Write<String> write = null;
+        if (filenamePolicy != null) {
+         write = org.apache.beam.sdk.io.Write.to(
+             new TextSink(filenamePolicy, header, footer, 
writableByteChannelFactory));
+        } else {
+          write = org.apache.beam.sdk.io.Write.to(
+              new TextSink(filenamePrefix, filenameSuffix, header, footer, 
shardTemplate,
+                  writableByteChannelFactory));
         }
-        org.apache.beam.sdk.io.Write<String> write =
-            org.apache.beam.sdk.io.Write.to(
-                new TextSink(filenamePrefix, filenameSuffix, header, footer, 
shardTemplate,
-                    writableByteChannelFactory));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
+        if (windowedWrites) {
+          write = write.withWindowedWrites();
+        }
         return input.apply("Write", write);
       }
 
@@ -676,8 +724,11 @@ public class TextIO {
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
 
-        String prefixString = filenamePrefix.isAccessible()
-            ? filenamePrefix.get() : filenamePrefix.toString();
+        String prefixString = "";
+        if (filenamePrefix != null) {
+          prefixString = filenamePrefix.isAccessible()
+              ? filenamePrefix.get() : filenamePrefix.toString();
+        }
         builder
             .addIfNotNull(DisplayData.item("filePrefix", prefixString)
               .withLabel("Output File Prefix"))
@@ -1023,6 +1074,13 @@ public class TextIO {
     @Nullable private final String footer;
 
     @VisibleForTesting
+    TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable 
String footer,
+             WritableByteChannelFactory writableByteChannelFactory) {
+      super(filenamePolicy, writableByteChannelFactory);
+      this.header = header;
+      this.footer = footer;
+    }
+    @VisibleForTesting
     TextSink(
         ValueProvider<String> baseOutputFilename,
         String extension,

Reply via email to