Migrate TextIO.Write to a custom sink

Note for user requested sharding limits to be supported,
each pipeline runner must support applying those sharding limits.

Google Cloud Dataflow supports sharding limits.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115310814


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7b5189c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7b5189c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7b5189c

Branch: refs/heads/master
Commit: d7b5189c5708b48308060dd40d6f3ab073759d28
Parents: 6b372ec
Author: lcwik <[email protected]>
Authored: Mon Feb 22 23:59:46 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:26 2016 -0800

----------------------------------------------------------------------
 .../google/cloud/dataflow/sdk/io/TextIO.java    | 187 +++++++++----------
 .../sdk/runners/DataflowPipelineRunner.java     | 108 ++++++++++-
 .../sdk/runners/DataflowPipelineTranslator.java |   5 -
 .../sdk/runners/dataflow/TextIOTranslator.java  |  91 ---------
 .../cloud/dataflow/sdk/io/TextIOTest.java       |  22 ---
 .../sdk/runners/DataflowPipelineRunnerTest.java |  21 +--
 .../runners/DataflowPipelineTranslatorTest.java |   4 +-
 .../dataflow/sdk/runners/TransformTreeTest.java |   9 +-
 8 files changed, 209 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
index 0bb2861..d342f25 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
@@ -26,11 +26,9 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.worker.TextSink;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
+import com.google.cloud.dataflow.sdk.util.MimeTypes;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PDone;
 import com.google.cloud.dataflow.sdk.values.PInput;
@@ -39,10 +37,13 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
-import java.util.List;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
 import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
 
@@ -66,7 +67,7 @@ import javax.annotation.Nullable;
  *
  * <p>See the following examples:
  *
- * <pre> {@code
+ * <pre>{@code
  * Pipeline p = ...;
  *
  * // A simple Read of a local file (only runs locally):
@@ -79,7 +80,7 @@ import javax.annotation.Nullable;
  *     p.apply(TextIO.Read.named("ReadNumbers")
  *                        .from("gs://my_bucket/path/to/numbers-*.txt")
  *                        .withCoder(TextualIntegerCoder.of()));
- * } </pre>
+ * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more text files, use
  * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
@@ -94,7 +95,7 @@ import javax.annotation.Nullable;
  * will be overwritten.
  *
  * <p>For example:
- * <pre> {@code
+ * <pre>{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<String> lines = ...;
  * lines.apply(TextIO.Write.to("/path/to/file.txt"));
@@ -106,7 +107,7 @@ import javax.annotation.Nullable;
  *                           .to("gs://my_bucket/path/to/numbers")
  *                           .withSuffix(".txt")
  *                           .withCoder(TextualIntegerCoder.of()));
- * } </pre>
+ * }</pre>
  *
  * <h3>Permissions</h3>
  * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read 
and write text files
@@ -477,9 +478,6 @@ public class TextIO {
       /** Requested number of shards. 0 for automatic. */
       private final int numShards;
 
-      /** Insert a shuffle before writing to decouple parallelism when 
numShards != 0. */
-      private final boolean forceReshard;
-
       /** The shard template of each file written, combined with prefix and 
suffix. */
       private final String shardTemplate;
 
@@ -487,17 +485,16 @@ public class TextIO {
       private final boolean validate;
 
       Bound(Coder<T> coder) {
-        this(null, null, "", coder, 0, true, ShardNameTemplate.INDEX_OF_MAX, 
true);
+        this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
       }
 
       private Bound(String name, String filenamePrefix, String filenameSuffix, 
Coder<T> coder,
-          int numShards, boolean forceReshard, String shardTemplate, boolean 
validate) {
+          int numShards, String shardTemplate, boolean validate) {
         super(name);
         this.coder = coder;
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
         this.numShards = numShards;
-        this.forceReshard = forceReshard;
         this.shardTemplate = shardTemplate;
         this.validate = validate;
       }
@@ -510,7 +507,7 @@ public class TextIO {
        */
       public Bound<T> named(String name) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
-            forceReshard, shardTemplate, validate);
+            shardTemplate, validate);
       }
 
       /**
@@ -523,7 +520,7 @@ public class TextIO {
        */
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
             shardTemplate, validate);
       }
 
@@ -537,7 +534,7 @@ public class TextIO {
        */
       public Bound<T> withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
-        return new Bound<>(name, filenamePrefix, nameExtension, coder, 
numShards, forceReshard,
+        return new Bound<>(name, filenamePrefix, nameExtension, coder, 
numShards,
             shardTemplate, validate);
       }
 
@@ -556,30 +553,8 @@ public class TextIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withNumShards(int numShards) {
-        return withNumShards(numShards, forceReshard);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. If forceReshard is true, the output
-       * will be shuffled to obtain the desired sharding. If it is false,
-       * data will not be reshuffled, but parallelism of preceeding stages
-       * may be constrained. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @param forceReshard whether to force a reshard to obtain the desired 
sharding.
-       * @see ShardNameTemplate
-       */
-      private Bound<T> withNumShards(int numShards, boolean forceReshard) {
         Preconditions.checkArgument(numShards >= 0);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
             shardTemplate, validate);
       }
 
@@ -592,7 +567,7 @@ public class TextIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
             shardTemplate, validate);
       }
 
@@ -610,25 +585,7 @@ public class TextIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutSharding() {
-        return withoutSharding(forceReshard);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Using this setting is not recommended
-       * unless you truly require a single output file.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      private Bound<T> withoutSharding(boolean forceReshard) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, 
forceReshard, "",
-            validate);
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", 
validate);
       }
 
       /**
@@ -640,7 +597,7 @@ public class TextIO {
        * @param <X> the type of the elements of the input {@link PCollection}
        */
       public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
             shardTemplate, validate);
       }
 
@@ -655,7 +612,7 @@ public class TextIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutValidation() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards, forceReshard,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 
numShards,
             shardTemplate, false);
       }
 
@@ -665,14 +622,13 @@ public class TextIO {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");
         }
-        if (numShards > 0 && forceReshard) {
-          // Reshard and re-apply a version of this write without resharding.
-          return input
-              .apply(new FileBasedSink.ReshardForWrite<T>())
-              .apply(withNumShards(numShards, false));
-        } else {
-          return PDone.in(input.getPipeline());
-        }
+
+        // Note that custom sinks currently do not expose sharding controls.
+        // Thus pipeline runner writers need to individually add support 
internally to
+        // apply user requested sharding limits.
+        return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
+            new TextSink<>(
+                filenamePrefix, filenameSuffix, shardTemplate, coder)));
       }
 
       /**
@@ -710,17 +666,6 @@ public class TextIO {
       public boolean needsValidation() {
         return validate;
       }
-
-      static {
-        DirectPipelineRunner.registerDefaultTransformEvaluator(
-            Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
-              @Override
-              public void evaluate(
-                  Bound transform, DirectPipelineRunner.EvaluationContext 
context) {
-                evaluateWriteHelper(transform, context);
-              }
-            });
-      }
     }
   }
 
@@ -978,24 +923,70 @@ public class TextIO {
     }
   }
 
-  private static <T> void evaluateWriteHelper(
-      Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext 
context) {
-    List<T> elems = context.getPCollection(context.getInput(transform));
-    int numShards = transform.numShards;
-    if (numShards < 1) {
-      // System gets to choose. For direct mode, choose 1.
-      numShards = 1;
+  /**
+   * A {@link FileBasedSink} for text files. Produces text files with the new 
line separator
+   * {@code '\n'} represented in {@code UTF-8} format as the record separator.
+   * Each record (including the last) is terminated.
+   */
+  @VisibleForTesting
+  static class TextSink<T> extends FileBasedSink<T> {
+    private final Coder<T> coder;
+
+    @VisibleForTesting
+    TextSink(
+        String baseOutputFilename, String extension, String fileNameTemplate, 
Coder<T> coder) {
+      super(baseOutputFilename, extension, fileNameTemplate);
+      this.coder = coder;
     }
-    TextSink<WindowedValue<T>> writer = TextSink.createForDirectPipelineRunner(
-        transform.filenamePrefix, transform.getShardNameTemplate(), 
transform.filenameSuffix,
-        numShards, true, null, null, transform.coder);
-    try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) {
-      for (T elem : elems) {
-        sink.add(WindowedValue.valueInGlobalWindow(elem));
-      }
-    } catch (IOException exn) {
-      throw new RuntimeException(
-          "unable to write to output file \"" + transform.filenamePrefix + 
"\"", exn);
+
+    @Override
+    public FileBasedSink.FileBasedWriteOperation<T> 
createWriteOperation(PipelineOptions options) {
+      return new TextWriteOperation<>(this, coder);
+    }
+
+    /**
+     * A {@link 
com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation
+     * FileBasedWriteOperation} for text files.
+     */
+    private static class TextWriteOperation<T> extends 
FileBasedWriteOperation<T> {
+      private final Coder<T> coder;
+
+      private TextWriteOperation(TextSink<T> sink, Coder<T> coder) {
+        super(sink);
+        this.coder = coder;
+      }
+
+      @Override
+      public FileBasedWriter<T> createWriter(PipelineOptions options) throws 
Exception {
+        return new TextWriter<>(this, coder);
+      }
+    }
+
+    /**
+     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter 
FileBasedWriter}
+     * for text files.
+     */
+    private static class TextWriter<T> extends FileBasedWriter<T> {
+      private static final byte[] NEWLINE = 
"\n".getBytes(StandardCharsets.UTF_8);
+      private final Coder<T> coder;
+      private OutputStream out;
+
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> 
coder) {
+        super(writeOperation);
+        this.mimeType = MimeTypes.TEXT;
+        this.coder = coder;
+      }
+
+      @Override
+      protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
+        out = Channels.newOutputStream(channel);
+      }
+
+      @Override
+      public void write(T value) throws Exception {
+        coder.encode(value, out, Context.OUTER);
+        out.write(NEWLINE);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 5a57f7f..396d308 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -339,6 +339,7 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       builder.put(Window.Bound.class, AssignWindows.class);
       builder.put(Write.Bound.class, BatchWrite.class);
       builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
+      builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class);
       if (options.getExperiments() == null
           || !options.getExperiments().contains("disable_ism_side_input")) {
         builder.put(View.AsMap.class, BatchViewAsMap.class);
@@ -1995,6 +1996,111 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
 
   /**
    * Specialized implementation which overrides
+   * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write.Bound 
TextIO.Write.Bound} with
+   * a native sink instead of a custom sink as workaround until custom sinks
+   * have support for sharding controls.
+   */
+  private static class BatchTextIOWrite<T> extends PTransform<PCollection<T>, 
PDone> {
+    private final TextIO.Write.Bound<T> transform;
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in 
DataflowPipelineRunner#apply()
+    public BatchTextIOWrite(DataflowPipelineRunner runner, 
TextIO.Write.Bound<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      if (transform.getNumShards() > 0) {
+        return input
+            .apply(new ReshardForWrite<T>())
+            .apply(new BatchTextIONativeWrite<>(transform));
+      } else {
+        return transform.apply(input);
+      }
+    }
+  }
+
+  /**
+   * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} 
as a way
+   * to provide the native definition of the Text sink.
+   */
+  private static class BatchTextIONativeWrite<T> extends 
PTransform<PCollection<T>, PDone> {
+    private final TextIO.Write.Bound<T> transform;
+    public BatchTextIONativeWrite(TextIO.Write.Bound<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      return PDone.in(input.getPipeline());
+    }
+
+    static {
+      DataflowPipelineTranslator.registerTransformTranslator(
+          BatchTextIONativeWrite.class, new 
BatchTextIONativeWriteTranslator());
+    }
+  }
+
+  /**
+   * TextIO.Write.Bound support code for the Dataflow backend when applying 
parallelism limits
+   * through user requested sharding limits.
+   */
+  private static class BatchTextIONativeWriteTranslator
+      implements TransformTranslator<BatchTextIONativeWrite<?>> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite 
transform,
+        TranslationContext context) {
+      translateWriteHelper(transform, transform.transform, context);
+    }
+
+    private <T> void translateWriteHelper(
+        BatchTextIONativeWrite<T> transform,
+        TextIO.Write.Bound<T> originalTransform,
+        TranslationContext context) {
+      // Note that the original transform can not be used during add step/add 
input
+      // and is only passed in to get properties from it.
+
+      checkState(originalTransform.getNumShards() > 0,
+          "Native TextSink is expected to only be used when sharding controls 
are required.");
+
+      context.addStep(transform, "ParallelWrite");
+      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+
+      // TODO: drop this check when server supports alternative templates.
+      switch (originalTransform.getShardTemplate()) {
+        case ShardNameTemplate.INDEX_OF_MAX:
+          break;  // supported by server
+        case "":
+          // Empty shard template allowed - forces single output.
+          Preconditions.checkArgument(originalTransform.getNumShards() <= 1,
+              "Num shards must be <= 1 when using an empty sharding template");
+          break;
+        default:
+          throw new UnsupportedOperationException("Shard template "
+              + originalTransform.getShardTemplate()
+              + " not yet supported by Dataflow service");
+      }
+
+      // TODO: How do we want to specify format and
+      // format-specific properties?
+      context.addInput(PropertyNames.FORMAT, "text");
+      context.addInput(PropertyNames.FILENAME_PREFIX, 
originalTransform.getFilenamePrefix());
+      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE,
+          originalTransform.getShardNameTemplate());
+      context.addInput(PropertyNames.FILENAME_SUFFIX, 
originalTransform.getFilenameSuffix());
+      context.addInput(PropertyNames.VALIDATE_SINK, 
originalTransform.needsValidation());
+      context.addInput(PropertyNames.NUM_SHARDS, (long) 
originalTransform.getNumShards());
+      context.addEncodingInput(
+          WindowedValue.getValueOnlyCoder(originalTransform.getCoder()));
+
+    }
+  }
+
+  /**
+   * Specialized implementation which overrides
    * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound 
AvroIO.Write.Bound} with
    * a native sink instead of a custom sink as workaround until custom sinks
    * have support for sharding controls.
@@ -2088,9 +2194,7 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, 
originalTransform.getShardTemplate());
       context.addInput(PropertyNames.FILENAME_SUFFIX, 
originalTransform.getFilenameSuffix());
       context.addInput(PropertyNames.VALIDATE_SINK, 
originalTransform.needsValidation());
-
       context.addInput(PropertyNames.NUM_SHARDS, (long) 
originalTransform.getNumShards());
-
       context.addEncodingInput(
           WindowedValue.getValueOnlyCoder(
               AvroCoder.of(originalTransform.getType(), 
originalTransform.getSchema())));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index 885260e..22ec3bb 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -44,14 +44,12 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder;
 import com.google.cloud.dataflow.sdk.io.BigQueryIO;
 import com.google.cloud.dataflow.sdk.io.PubsubIO;
 import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
 import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
-import com.google.cloud.dataflow.sdk.runners.dataflow.TextIOTranslator;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -1037,9 +1035,6 @@ public class DataflowPipelineTranslator {
         DataflowPipelineRunner.StreamingPubsubIOWrite.class,
         new PubsubIOTranslator.WriteTranslator());
 
-    registerTransformTranslator(
-        TextIO.Write.Bound.class, new TextIOTranslator.WriteTranslator());
-
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
deleted file mode 100644
index d6c96c3..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.common.base.Preconditions;
-
-/**
- * TextIO transform support code for the Dataflow backend.
- */
-public class TextIOTranslator {
-  /**
-   * Implements TextIO Write translation for the Dataflow backend.
-   */
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public static class WriteTranslator implements 
TransformTranslator<TextIO.Write.Bound> {
-    @Override
-    public void translate(
-        TextIO.Write.Bound transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        TextIO.Write.Bound<T> transform,
-        TranslationContext context) {
-      if (context.getPipelineOptions().isStreaming()) {
-        throw new IllegalArgumentException("TextIO not supported in streaming 
mode.");
-      }
-
-      PathValidator validator = 
context.getPipelineOptions().getPathValidator();
-      String filenamePrefix = validator.validateOutputFilePrefixSupported(
-          transform.getFilenamePrefix());
-
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-
-      // TODO: drop this check when server supports alternative templates.
-      switch (transform.getShardTemplate()) {
-        case ShardNameTemplate.INDEX_OF_MAX:
-          break;  // supported by server
-        case "":
-          // Empty shard template allowed - forces single output.
-          Preconditions.checkArgument(transform.getNumShards() <= 1,
-              "Num shards must be <= 1 when using an empty sharding template");
-          break;
-        default:
-          throw new UnsupportedOperationException("Shard template "
-              + transform.getShardTemplate()
-              + " not yet supported by Dataflow service");
-      }
-
-      // TODO: How do we want to specify format and
-      // format-specific properties?
-      context.addInput(PropertyNames.FORMAT, "text");
-      context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix);
-      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE,
-          transform.getShardNameTemplate());
-      context.addInput(PropertyNames.FILENAME_SUFFIX, 
transform.getFilenameSuffix());
-      context.addInput(PropertyNames.VALIDATE_SINK, 
transform.needsValidation());
-
-      long numShards = transform.getNumShards();
-      if (numShards > 0) {
-        context.addInput(PropertyNames.NUM_SHARDS, numShards);
-      }
-
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(transform.getCoder()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
index 6ad81e4..0a8e381 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
@@ -264,28 +264,6 @@ public class TextIOTest {
   }
 
   @Test
-  public void testWriteSharded() throws IOException {
-    File outFolder = tmpFolder.newFolder();
-    String filename = outFolder.toPath().resolve("output").toString();
-
-    Pipeline p = TestPipeline.create();
-
-    PCollection<String> input =
-        p.apply(Create.of(Arrays.asList(LINES_ARRAY))
-            .withCoder(StringUtf8Coder.of()));
-
-    input.apply(TextIO.Write.to(filename).withNumShards(2).withSuffix(".txt"));
-
-    p.run();
-
-    String[] files = outFolder.list();
-
-    assertThat(Arrays.asList(files),
-        containsInAnyOrder("output-00000-of-00002.txt",
-                           "output-00001-of-00002.txt"));
-  }
-
-  @Test
   public void testWriteNamed() {
     {
       PTransform<PCollection<String>, PDone> transform1 =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
index c7175cb..c5f2d3f 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
@@ -453,16 +453,12 @@ public class DataflowPipelineRunnerTest {
 
   @Test
   public void testNonGcsFilePathInWriteFailure() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
+    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
+    PCollection<String> pc = 
p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("expected a valid 'gs://' path but was 
given"));
-    p.run();
-    assertValidJob(jobCaptor.getValue());
+    pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
   }
 
   @Test
@@ -482,17 +478,12 @@ public class DataflowPipelineRunnerTest {
 
   @Test
   public void testMultiSlashGcsFileWritePath() throws IOException {
-    ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-
-    Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteInvalidGcsFile")
-            .to("gs://bucket/tmp//file"));
+    Pipeline p = buildDataflowPipeline(buildPipelineOptions());
+    PCollection<String> pc = 
p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("consecutive slashes");
-    p.run();
-    assertValidJob(jobCaptor.getValue());
+    
pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
index b9c94ad..72090a0 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -403,7 +403,7 @@ public class DataflowPipelineTranslatorTest {
     pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()))
         .apply(new EmbeddedTransform(predefinedStep.clone()))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
+        .apply(ParDo.of(new NoOpFn()));
     Job job = translator.translate(
         pipeline, pipeline.getRunner(), 
Collections.<DataflowPackage>emptyList()).getJob();
 
@@ -456,7 +456,7 @@ public class DataflowPipelineTranslatorTest {
     Job job = translator.translate(
         pipeline, pipeline.getRunner(), 
Collections.<DataflowPackage>emptyList()).getJob();
 
-    assertEquals(3, job.getSteps().size());
+    assertEquals(13, job.getSteps().size());
     Step step = job.getSteps().get(1);
     assertEquals(stepName, getString(step.getProperties(), 
PropertyNames.USER_NAME));
     return step;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
index f1b7cd7..68e1db1 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java
@@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.io.Write;
 import com.google.cloud.dataflow.sdk.transforms.Count;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
@@ -133,9 +134,12 @@ public class TransformTreeTest {
           assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
+        } else if (transform instanceof Write.Bound) {
+          assertTrue(visited.add(TransformsSeen.WRITE));
+          assertNotNull(node.getEnclosingNode());
+          assertTrue(node.isCompositeNode());
         }
         assertThat(transform, not(instanceOf(Read.Bounded.class)));
-        assertThat(transform, not(instanceOf(TextIO.Write.Bound.class)));
       }
 
       @Override
@@ -151,10 +155,9 @@ public class TransformTreeTest {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Sample.SampleAny.class)));
+        assertThat(transform, not(instanceOf(Write.Bound.class)));
         if (transform instanceof Read.Bounded) {
           assertTrue(visited.add(TransformsSeen.READ));
-        } else if (transform instanceof TextIO.Write.Bound) {
-          assertTrue(visited.add(TransformsSeen.WRITE));
         }
       }
 

Reply via email to