Revert "Migrate TextIO.Write to a custom sink"

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115351833


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3904c907
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3904c907
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3904c907

Branch: refs/heads/master
Commit: 3904c9074e66733686285d09ce5068d28f303dd8
Parents: 45f5951
Author: sgmc <[email protected]>
Authored: Tue Feb 23 09:53:38 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:26 2016 -0800

----------------------------------------------------------------------
 .../google/cloud/dataflow/sdk/io/TextIO.java    | 187 ++++++++++---------
 .../sdk/runners/DataflowPipelineRunner.java     | 108 +----------
 .../sdk/runners/DataflowPipelineTranslator.java |   5 +
 .../sdk/runners/dataflow/TextIOTranslator.java  |  91 +++++++++
 .../cloud/dataflow/sdk/io/TextIOTest.java       |  22 +++
 .../sdk/runners/DataflowPipelineRunnerTest.java |  21 ++-
 .../runners/DataflowPipelineTranslatorTest.java |   4 +-
 .../dataflow/sdk/runners/TransformTreeTest.java |   9 +-
 8 files changed, 238 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
index d342f25..0bb2861 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
@@ -26,9 +26,11 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.worker.TextSink;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PDone;
 import com.google.cloud.dataflow.sdk.values.PInput;
@@ -37,13 +39,10 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
 
@@ -67,7 +66,7 @@ import javax.annotation.Nullable;
  *
  * <p>See the following examples:
  *
- * <pre>{@code
+ * <pre> {@code
  * Pipeline p = ...;
  *
  * // A simple Read of a local file (only runs locally):
@@ -80,7 +79,7 @@ import javax.annotation.Nullable;
  *     p.apply(TextIO.Read.named("ReadNumbers")
  *                        .from("gs://my_bucket/path/to/numbers-*.txt")
  *                        .withCoder(TextualIntegerCoder.of()));
- * }</pre>
+ * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more text files, use
  * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
@@ -95,7 +94,7 @@ import javax.annotation.Nullable;
  * will be overwritten.
  *
  * <p>For example:
- * <pre>{@code
+ * <pre> {@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<String> lines = ...;
  * lines.apply(TextIO.Write.to("/path/to/file.txt"));
@@ -107,7 +106,7 @@ import javax.annotation.Nullable;
  *                           .to("gs://my_bucket/path/to/numbers")
  *                           .withSuffix(".txt")
  *                           .withCoder(TextualIntegerCoder.of()));
- * }</pre>
+ * } </pre>
  *
  * <h3>Permissions</h3>
  * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read 
and write text files
@@ -478,6 +477,9 @@ public class TextIO {
       /** Requested number of shards. 0 for automatic. */
       private final int numShards;
 
+      /** Insert a shuffle before writing to decouple parallelism when 
numShards != 0. */
+      private final boolean forceReshard;
+
       /** The shard template of each file written, combined with prefix and 
suffix. */
       private final String shardTemplate;
 
@@ -485,16 +487,17 @@ public class TextIO {
       private final boolean validate;
 
       Bound(Coder<T> coder) {
-        this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
+        this(null, null, "", coder, 0, true, ShardNameTemplate.INDEX_OF_MAX, 
true);
       }
 
       private Bound(String name, String filenamePrefix, String filenameSuffix, 
Coder<T> coder,
-          int numShards, String shardTemplate, boolean validate) {
+          int numShards, boolean forceReshard, String shardTemplate, boolean 
validate) {
         super(name);
         this.coder = coder;
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
         this.numShards = numShards;
+        this.forceReshard = forceReshard;
         this.shardTemplate = shardTemplate;
         this.validate = validate;
       }
@@ -507,7 +510,7 @@ public class TextIO {
        */
       public Bound<T> named(String name) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
-            shardTemplate, validate);
+            forceReshard, shardTemplate, validate);
       }
 
       /**
@@ -520,7 +523,7 @@ public class TextIO {
        */
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
             shardTemplate, validate);
       }
 
@@ -534,7 +537,7 @@ public class TextIO {
        */
       public Bound<T> withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
-        return new Bound<>(name, filenamePrefix, nameExtension, coder, 
numShards,
+        return new Bound<>(name, filenamePrefix, nameExtension, coder, 
numShards, forceReshard,
             shardTemplate, validate);
       }
 
@@ -553,8 +556,30 @@ public class TextIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withNumShards(int numShards) {
+        return withNumShards(numShards, forceReshard);
+      }
+
+      /**
+       * Returns a transform for writing to text files that's like this one but
+       * that uses the provided shard count.
+       *
+       * <p>Constraining the number of shards is likely to reduce
+       * the performance of a pipeline. If forceReshard is true, the output
+       * will be shuffled to obtain the desired sharding. If it is false,
+       * data will not be reshuffled, but parallelism of preceeding stages
+       * may be constrained. Setting this value is not recommended
+       * unless you require a specific number of output files.
+       *
+       * <p>Does not modify this object.
+       *
+       * @param numShards the number of shards to use, or 0 to let the system
+       *                  decide.
+       * @param forceReshard whether to force a reshard to obtain the desired 
sharding.
+       * @see ShardNameTemplate
+       */
+      private Bound<T> withNumShards(int numShards, boolean forceReshard) {
         Preconditions.checkArgument(numShards >= 0);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
             shardTemplate, validate);
       }
 
@@ -567,7 +592,7 @@ public class TextIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
             shardTemplate, validate);
       }
 
@@ -585,7 +610,25 @@ public class TextIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", 
validate);
+        return withoutSharding(forceReshard);
+      }
+
+      /**
+       * Returns a transform for writing to text files that's like this one but
+       * that forces a single file as output.
+       *
+       * <p>Constraining the number of shards is likely to reduce
+       * the performance of a pipeline. Using this setting is not recommended
+       * unless you truly require a single output file.
+       *
+       * <p>This is a shortcut for
+       * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")}
+       *
+       * <p>Does not modify this object.
+       */
+      private Bound<T> withoutSharding(boolean forceReshard) {
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, 
forceReshard, "",
+            validate);
       }
 
       /**
@@ -597,7 +640,7 @@ public class TextIO {
        * @param <X> the type of the elements of the input {@link PCollection}
        */
       public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
             shardTemplate, validate);
       }
 
@@ -612,7 +655,7 @@ public class TextIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutValidation() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
             shardTemplate, false);
       }
 
@@ -622,13 +665,14 @@ public class TextIO {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");
         }
-
-        // Note that custom sinks currently do not expose sharding controls.
-        // Thus pipeline runner writers need to individually add support 
internally to
-        // apply user requested sharding limits.
-        return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
-            new TextSink<>(
-                filenamePrefix, filenameSuffix, shardTemplate, coder)));
+        if (numShards > 0 && forceReshard) {
+          // Reshard and re-apply a version of this write without resharding.
+          return input
+              .apply(new FileBasedSink.ReshardForWrite<T>())
+              .apply(withNumShards(numShards, false));
+        } else {
+          return PDone.in(input.getPipeline());
+        }
       }
 
       /**
@@ -666,6 +710,17 @@ public class TextIO {
       public boolean needsValidation() {
         return validate;
       }
+
+      static {
+        DirectPipelineRunner.registerDefaultTransformEvaluator(
+            Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
+              @Override
+              public void evaluate(
+                  Bound transform, DirectPipelineRunner.EvaluationContext 
context) {
+                evaluateWriteHelper(transform, context);
+              }
+            });
+      }
     }
   }
 
@@ -923,70 +978,24 @@ public class TextIO {
     }
   }
 
-  /**
-   * A {@link FileBasedSink} for text files. Produces text files with the new 
line separator
-   * {@code '\n'} represented in {@code UTF-8} format as the record separator.
-   * Each record (including the last) is terminated.
-   */
-  @VisibleForTesting
-  static class TextSink<T> extends FileBasedSink<T> {
-    private final Coder<T> coder;
-
-    @VisibleForTesting
-    TextSink(
-        String baseOutputFilename, String extension, String fileNameTemplate, 
Coder<T> coder) {
-      super(baseOutputFilename, extension, fileNameTemplate);
-      this.coder = coder;
+  private static <T> void evaluateWriteHelper(
+      Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext 
context) {
+    List<T> elems = context.getPCollection(context.getInput(transform));
+    int numShards = transform.numShards;
+    if (numShards < 1) {
+      // System gets to choose. For direct mode, choose 1.
+      numShards = 1;
     }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<T> 
createWriteOperation(PipelineOptions options) {
-      return new TextWriteOperation<>(this, coder);
-    }
-
-    /**
-     * A {@link 
com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for text files.
-     */
-    private static class TextWriteOperation<T> extends 
FileBasedWriteOperation<T> {
-      private final Coder<T> coder;
-
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder) {
-        super(sink);
-        this.coder = coder;
-      }
-
-      @Override
-      public FileBasedWriter<T> createWriter(PipelineOptions options) throws 
Exception {
-        return new TextWriter<>(this, coder);
-      }
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter 
FileBasedWriter}
-     * for text files.
-     */
-    private static class TextWriter<T> extends FileBasedWriter<T> {
-      private static final byte[] NEWLINE = 
"\n".getBytes(StandardCharsets.UTF_8);
-      private final Coder<T> coder;
-      private OutputStream out;
-
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> 
coder) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.TEXT;
-        this.coder = coder;
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
-        out = Channels.newOutputStream(channel);
-      }
-
-      @Override
-      public void write(T value) throws Exception {
-        coder.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
-      }
+    TextSink<WindowedValue<T>> writer = TextSink.createForDirectPipelineRunner(
+        transform.filenamePrefix, transform.getShardNameTemplate(), 
transform.filenameSuffix,
+        numShards, true, null, null, transform.coder);
+    try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) {
+      for (T elem : elems) {
+        sink.add(WindowedValue.valueInGlobalWindow(elem));
+      }
+    } catch (IOException exn) {
+      throw new RuntimeException(
+          "unable to write to output file \"" + transform.filenamePrefix + 
"\"", exn);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 396d308..5a57f7f 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -339,7 +339,6 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       builder.put(Window.Bound.class, AssignWindows.class);
       builder.put(Write.Bound.class, BatchWrite.class);
       builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
-      builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
       if (options.getExperiments() == null
           || !options.getExperiments().contains("disable_ism_side_input")) {
         builder.put(View.AsMap.class, BatchViewAsMap.class);
@@ -1996,111 +1995,6 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
 
   /**
    * Specialized implementation which overrides
-   * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write.Bound 
TextIO.Write.Bound} with
-   * a native sink instead of a custom sink as workaround until custom sinks
-   * have support for sharding controls.
-   */
-  private static class BatchTextIOWrite<T> extends PTransform<PCollection<T>, 
PDone> {
-    private final TextIO.Write.Bound<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowPipelineRunner#apply()
-    public BatchTextIOWrite(DataflowPipelineRunner runner, 
TextIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      if (transform.getNumShards() > 0) {
-        return input
-            .apply(new ReshardForWrite<T>())
-            .apply(new BatchTextIONativeWrite<>(transform));
-      } else {
-        return transform.apply(input);
-      }
-    }
-  }
-
-  /**
-   * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} 
as a way
-   * to provide the native definition of the Text sink.
-   */
-  private static class BatchTextIONativeWrite<T> extends 
PTransform<PCollection<T>, PDone> {
-    private final TextIO.Write.Bound<T> transform;
-    public BatchTextIONativeWrite(TextIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      return PDone.in(input.getPipeline());
-    }
-
-    static {
-      DataflowPipelineTranslator.registerTransformTranslator(
-          BatchTextIONativeWrite.class, new 
BatchTextIONativeWriteTranslator());
-    }
-  }
-
-  /**
-   * TextIO.Write.Bound support code for the Dataflow backend when applying 
parallelism limits
-   * through user requested sharding limits.
-   */
-  private static class BatchTextIONativeWriteTranslator
-      implements TransformTranslator<BatchTextIONativeWrite<?>> {
-    @SuppressWarnings("unchecked")
-    @Override
-    public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite 
transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, transform.transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        BatchTextIONativeWrite<T> transform,
-        TextIO.Write.Bound<T> originalTransform,
-        TranslationContext context) {
-      // Note that the original transform can not be used during add step/add 
input
-      // and is only passed in to get properties from it.
-
-      checkState(originalTransform.getNumShards() > 0,
-          "Native TextSink is expected to only be used when sharding controls 
are required.");
-
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-
-      // TODO: drop this check when server supports alternative templates.
-      switch (originalTransform.getShardTemplate()) {
-        case ShardNameTemplate.INDEX_OF_MAX:
-          break;  // supported by server
-        case "":
-          // Empty shard template allowed - forces single output.
-          Preconditions.checkArgument(originalTransform.getNumShards() <= 1,
-              "Num shards must be <= 1 when using an empty sharding template");
-          break;
-        default:
-          throw new UnsupportedOperationException("Shard template "
-              + originalTransform.getShardTemplate()
-              + " not yet supported by Dataflow service");
-      }
-
-      // TODO: How do we want to specify format and
-      // format-specific properties?
-      context.addInput(PropertyNames.FORMAT, "text");
-      context.addInput(PropertyNames.FILENAME_PREFIX, 
originalTransform.getFilenamePrefix());
-      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE,
-          originalTransform.getShardNameTemplate());
-      context.addInput(PropertyNames.FILENAME_SUFFIX, 
originalTransform.getFilenameSuffix());
-      context.addInput(PropertyNames.VALIDATE_SINK, 
originalTransform.needsValidation());
-      context.addInput(PropertyNames.NUM_SHARDS, (long) 
originalTransform.getNumShards());
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(originalTransform.getCoder()));
-
-    }
-  }
-
-  /**
-   * Specialized implementation which overrides
    * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound 
AvroIO.Write.Bound} with
    * a native sink instead of a custom sink as workaround until custom sinks
    * have support for sharding controls.
@@ -2194,7 +2088,9 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, 
originalTransform.getShardTemplate());
       context.addInput(PropertyNames.FILENAME_SUFFIX, 
originalTransform.getFilenameSuffix());
       context.addInput(PropertyNames.VALIDATE_SINK, 
originalTransform.needsValidation());
+
       context.addInput(PropertyNames.NUM_SHARDS, (long) 
originalTransform.getNumShards());
+
       context.addEncodingInput(
           WindowedValue.getValueOnlyCoder(
               AvroCoder.of(originalTransform.getType(), 
originalTransform.getSchema())));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index 22ec3bb..885260e 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -44,12 +44,14 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder;
 import com.google.cloud.dataflow.sdk.io.BigQueryIO;
 import com.google.cloud.dataflow.sdk.io.PubsubIO;
 import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
 import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
+import com.google.cloud.dataflow.sdk.runners.dataflow.TextIOTranslator;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -1035,6 +1037,9 @@ public class DataflowPipelineTranslator {
         DataflowPipelineRunner.StreamingPubsubIOWrite.class,
         new PubsubIOTranslator.WriteTranslator());
 
+    registerTransformTranslator(
+        TextIO.Write.Bound.class, new TextIOTranslator.WriteTranslator());
+
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
new file mode 100644
index 0000000..d6c96c3
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.runners.dataflow;
+
+import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
+import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
+import com.google.cloud.dataflow.sdk.util.PathValidator;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.common.base.Preconditions;
+
+/**
+ * TextIO transform support code for the Dataflow backend.
+ */
+public class TextIOTranslator {
+  /**
+   * Implements TextIO Write translation for the Dataflow backend.
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public static class WriteTranslator implements 
TransformTranslator<TextIO.Write.Bound> {
+    @Override
+    public void translate(
+        TextIO.Write.Bound transform,
+        TranslationContext context) {
+      translateWriteHelper(transform, context);
+    }
+
+    private <T> void translateWriteHelper(
+        TextIO.Write.Bound<T> transform,
+        TranslationContext context) {
+      if (context.getPipelineOptions().isStreaming()) {
+        throw new IllegalArgumentException("TextIO not supported in streaming 
mode.");
+      }
+
+      PathValidator validator = 
context.getPipelineOptions().getPathValidator();
+      String filenamePrefix = validator.validateOutputFilePrefixSupported(
+          transform.getFilenamePrefix());
+
+      context.addStep(transform, "ParallelWrite");
+      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+
+      // TODO: drop this check when server supports alternative templates.
+      switch (transform.getShardTemplate()) {
+        case ShardNameTemplate.INDEX_OF_MAX:
+          break;  // supported by server
+        case "":
+          // Empty shard template allowed - forces single output.
+          Preconditions.checkArgument(transform.getNumShards() <= 1,
+              "Num shards must be <= 1 when using an empty sharding template");
+          break;
+        default:
+          throw new UnsupportedOperationException("Shard template "
+              + transform.getShardTemplate()
+              + " not yet supported by Dataflow service");
+      }
+
+      // TODO: How do we want to specify format and
+      // format-specific properties?
+      context.addInput(PropertyNames.FORMAT, "text");
+      context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix);
+      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE,
+          transform.getShardNameTemplate());
+      context.addInput(PropertyNames.FILENAME_SUFFIX, 
transform.getFilenameSuffix());
+      context.addInput(PropertyNames.VALIDATE_SINK, 
transform.needsValidation());
+
+      long numShards = transform.getNumShards();
+      if (numShards > 0) {
+        context.addInput(PropertyNames.NUM_SHARDS, numShards);
+      }
+
+      context.addEncodingInput(
+          WindowedValue.getValueOnlyCoder(transform.getCoder()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
index 0a8e381..6ad81e4 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
@@ -264,6 +264,28 @@ public class TextIOTest {
   }
 
   @Test
+  public void testWriteSharded() throws IOException {
+    File outFolder = tmpFolder.newFolder();
+    String filename = outFolder.toPath().resolve("output").toString();
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<String> input =
+        p.apply(Create.of(Arrays.asList(LINES_ARRAY))
+            .withCoder(StringUtf8Coder.of()));
+
+    input.apply(TextIO.Write.to(filename).withNumShards(2).withSuffix(".txt"));
+
+    p.run();
+
+    String[] files = outFolder.list();
+
+    assertThat(Arrays.asList(files),
+        containsInAnyOrder("output-00000-of-00002.txt",
+                           "output-00001-of-00002.txt"));
+  }
+
+  @Test
   public void testWriteNamed() {
     {
       PTransform<PCollection<String>, PDone> transform1 =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
index c5f2d3f..c7175cb 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
@@ -453,12 +453,16 @@ public class DataflowPipelineRunnerTest {
 
   @Test
   public void testNonGcsFilePathInWriteFailure() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = 
p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+    p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"))
+        .apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("expected a valid 'gs://' path but was 
given"));
-    pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
+    p.run();
+    assertValidJob(jobCaptor.getValue());
   }
 
   @Test
@@ -478,12 +482,17 @@ public class DataflowPipelineRunnerTest {
 
   @Test
   public void testMultiSlashGcsFileWritePath() throws IOException {
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = 
p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
+
+    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
+    p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"))
+        .apply(TextIO.Write.named("WriteInvalidGcsFile")
+            .to("gs://bucket/tmp//file"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("consecutive slashes");
-    
pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
+    p.run();
+    assertValidJob(jobCaptor.getValue());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
index 72090a0..b9c94ad 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -403,7 +403,7 @@ public class DataflowPipelineTranslatorTest {
     pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()))
         .apply(new EmbeddedTransform(predefinedStep.clone()))
-        .apply(ParDo.of(new NoOpFn()));
+        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
     Job job = translator.translate(
         pipeline, pipeline.getRunner(), 
Collections.<DataflowPackage>emptyList()).getJob();
 
@@ -456,7 +456,7 @@ public class DataflowPipelineTranslatorTest {
     Job job = translator.translate(
         pipeline, pipeline.getRunner(), 
Collections.<DataflowPackage>emptyList()).getJob();
 
-    assertEquals(13, job.getSteps().size());
+    assertEquals(3, job.getSteps().size());
     Step step = job.getSteps().get(1);
     assertEquals(stepName, getString(step.getProperties(), 
PropertyNames.USER_NAME));
     return step;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
index 68e1db1..f1b7cd7 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
@@ -28,7 +28,6 @@ import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.io.Write;
 import com.google.cloud.dataflow.sdk.transforms.Count;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
@@ -134,12 +133,9 @@ public class TransformTreeTest {
           assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
-        } else if (transform instanceof Write.Bound) {
-          assertTrue(visited.add(TransformsSeen.WRITE));
-          assertNotNull(node.getEnclosingNode());
-          assertTrue(node.isCompositeNode());
         }
         assertThat(transform, not(instanceOf(Read.Bounded.class)));
+        assertThat(transform, not(instanceOf(TextIO.Write.Bound.class)));
       }
 
       @Override
@@ -155,9 +151,10 @@ public class TransformTreeTest {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
-        assertThat(transform, not(instanceOf(Write.Bound.class)));
         if (transform instanceof Read.Bounded) {
           assertTrue(visited.add(TransformsSeen.READ));
+        } else if (transform instanceof TextIO.Write.Bound) {
+          assertTrue(visited.add(TransformsSeen.WRITE));
         }
       }
 

Reply via email to