Revert "Migrate AvroIO.Write to a custom sink"

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115355272


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e89a4b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e89a4b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e89a4b3

Branch: refs/heads/master
Commit: 2e89a4b3ddfd5d395aa8d6c4f73b501712b907a7
Parents: 635541a
Author: sgmc <[email protected]>
Authored: Tue Feb 23 10:21:54 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:26 2016 -0800

----------------------------------------------------------------------
 .../google/cloud/dataflow/sdk/io/AvroIO.java    | 188 ++++++++++---------
 .../sdk/runners/DataflowPipelineRunner.java     | 179 ------------------
 .../sdk/runners/DataflowPipelineTranslator.java |   5 +
 .../sdk/runners/dataflow/AvroIOTranslator.java  |  87 +++++++++
 .../sdk/io/AvroIOGeneratedClassTest.java        |  48 ++---
 .../cloud/dataflow/sdk/io/AvroIOTest.java       |  45 -----
 6 files changed, 215 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
index 7042010..9ee7e6b 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
@@ -22,25 +22,24 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.worker.AvroSink;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PDone;
 import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
 
 import java.io.IOException;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
+import java.util.List;
 import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
@@ -318,7 +317,7 @@ public class AvroIO {
                 : com.google.cloud.dataflow.sdk.io.Read.from(
                     AvroSource.from(filepattern).withSchema(type));
 
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
+        PCollection<T> pcol = input.getPipeline().apply(read);
         // Honor the default output coder that would have been used by this 
PTransform.
         pcol.setCoder(getDefaultOutputCoder());
         return pcol;
@@ -474,6 +473,8 @@ public class AvroIO {
       final int numShards;
       /** Shard template string. */
       final String shardTemplate;
+      /** Insert a shuffle before writing to decouple parallelism when 
numShards != 0. */
+      final boolean forceReshard;
       /** The class type of the records. */
       final Class<T> type;
       /** The schema of the output file. */
@@ -483,16 +484,18 @@ public class AvroIO {
       final boolean validate;
 
       Bound(Class<T> type) {
-        this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, 
true);
+        this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, true, type, 
null, true);
       }
 
       Bound(String name, String filenamePrefix, String filenameSuffix, int 
numShards,
-          String shardTemplate, Class<T> type, Schema schema, boolean 
validate) {
+          String shardTemplate, boolean forceReshard, Class<T> type, Schema 
schema,
+          boolean validate) {
         super(name);
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
         this.numShards = numShards;
         this.shardTemplate = shardTemplate;
+        this.forceReshard = forceReshard;
         this.type = type;
         this.schema = schema;
         this.validate = validate;
@@ -506,7 +509,7 @@ public class AvroIO {
        */
       public Bound<T> named(String name) {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate,
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
             type, schema, validate);
       }
 
@@ -522,7 +525,7 @@ public class AvroIO {
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate,
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
             type, schema, validate);
       }
 
@@ -537,7 +540,7 @@ public class AvroIO {
       public Bound<T> withSuffix(String filenameSuffix) {
         validateOutputComponent(filenameSuffix);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type,
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard, type,
             schema, validate);
       }
 
@@ -556,9 +559,31 @@ public class AvroIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withNumShards(int numShards) {
+        return withNumShards(numShards, forceReshard);
+      }
+
+      /**
+       * Returns a new {@link PTransform} that's like this one but
+       * that uses the provided shard count.
+       *
+       * <p>Constraining the number of shards is likely to reduce
+       * the performance of a pipeline. If forceReshard is true, the output
+       * will be shuffled to obtain the desired sharding. If it is false,
+       * data will not be reshuffled, but parallelism of preceeding stages
+       * may be constrained. Setting this value is not recommended
+       * unless you require a specific number of output files.
+       *
+       * <p>Does not modify this object.
+       *
+       * @param numShards the number of shards to use, or 0 to let the system
+       *                  decide.
+       * @param forceReshard whether to force a reshard to obtain the desired 
sharding.
+       * @see ShardNameTemplate
+       */
+      private Bound<T> withNumShards(int numShards, boolean forceReshard) {
         Preconditions.checkArgument(numShards >= 0);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate,
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
             type, schema, validate);
       }
 
@@ -572,7 +597,7 @@ public class AvroIO {
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate,
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
             type, schema, validate);
       }
 
@@ -586,7 +611,22 @@ public class AvroIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "",
+        return withoutSharding(forceReshard);
+      }
+
+      /**
+       * Returns a new {@link PTransform} that's like this one but
+       * that forces a single file as output.
+       *
+       * <p>This is a shortcut for
+       * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")}
+       *
+       * <p>Does not modify this object.
+       *
+       * @param forceReshard whether to force a reshard to obtain the desired 
sharding.
+       */
+      private Bound<T> withoutSharding(boolean forceReshard) {
+        return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", 
forceReshard,
             type, schema, validate);
       }
 
@@ -601,7 +641,7 @@ public class AvroIO {
        */
       public <X> Bound<X> withSchema(Class<X> type) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, 
shardTemplate,
-            type, ReflectData.get().getSchema(type), validate);
+            forceReshard, type, ReflectData.get().getSchema(type), validate);
       }
 
       /**
@@ -613,7 +653,7 @@ public class AvroIO {
        */
       public Bound<GenericRecord> withSchema(Schema schema) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, 
shardTemplate,
-            GenericRecord.class, schema, validate);
+            forceReshard, GenericRecord.class, schema, validate);
       }
 
       /**
@@ -639,7 +679,7 @@ public class AvroIO {
        */
       public Bound<T> withoutValidation() {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate,
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
             type, schema, false);
       }
 
@@ -653,12 +693,14 @@ public class AvroIO {
           throw new IllegalStateException("need to set the schema of an 
AvroIO.Write transform");
         }
 
-        // Note that custom sinks currently do not expose sharding controls.
-        // Thus pipeline runner writers need to individually add support 
internally to
-        // apply user requested sharding limits.
-        return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
-            new AvroSink<>(
-                filenamePrefix, filenameSuffix, shardTemplate, 
AvroCoder.of(type, schema))));
+        if (numShards > 0 && forceReshard) {
+          // Reshard and re-apply a version of this write without resharding.
+          return input
+              .apply(new FileBasedSink.ReshardForWrite<T>())
+              .apply(withNumShards(numShards, false));
+        } else {
+          return PDone.in(input.getPipeline());
+        }
       }
 
       /**
@@ -700,6 +742,21 @@ public class AvroIO {
       public boolean needsValidation() {
         return validate;
       }
+
+      static {
+        @SuppressWarnings("rawtypes")
+        DirectPipelineRunner.TransformEvaluator<Bound> transformEvaluator =
+            new DirectPipelineRunner.TransformEvaluator<Bound>() {
+          @Override
+          @SuppressWarnings("unchecked")
+          public void evaluate(
+              Bound transform, DirectPipelineRunner.EvaluationContext context) 
{
+            evaluateWriteHelper(transform, context);
+          }
+        };
+        DirectPipelineRunner.registerDefaultTransformEvaluator(
+            Bound.class, transformEvaluator);
+      }
     }
 
     /** Disallow construction of utility class. */
@@ -722,72 +779,25 @@ public class AvroIO {
   /** Disallow construction of utility class. */
   private AvroIO() {}
 
-  /**
-   * A {@link FileBasedSink} for Avro files.
-   */
-  @VisibleForTesting
-  static class AvroSink<T> extends FileBasedSink<T> {
-    private final AvroCoder<T> coder;
-
-    @VisibleForTesting
-    AvroSink(
-        String baseOutputFilename, String extension, String fileNameTemplate, 
AvroCoder<T> coder) {
-      super(baseOutputFilename, extension, fileNameTemplate);
-      this.coder = coder;
-    }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<T> 
createWriteOperation(PipelineOptions options) {
-      return new AvroWriteOperation<>(this, coder);
-    }
-
-    /**
-     * A {@link 
com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for Avro files.
-     */
-    private static class AvroWriteOperation<T> extends 
FileBasedWriteOperation<T> {
-      private final AvroCoder<T> coder;
-
-      private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) {
-        super(sink);
-        this.coder = coder;
-      }
-
-      @Override
-      public FileBasedWriter<T> createWriter(PipelineOptions options) throws 
Exception {
-        return new AvroWriter<>(this, coder);
-      }
+  private static <T> void evaluateWriteHelper(
+      Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext 
context) {
+    List<WindowedValue<T>> elems =
+        context.getPCollectionWindowedValues(context.getInput(transform));
+    int numShards = transform.numShards;
+    if (numShards < 1) {
+      // System gets to choose. For direct mode, choose 1.
+      numShards = 1;
     }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter 
FileBasedWriter}
-     * for Avro files.
-     */
-    private static class AvroWriter<T> extends FileBasedWriter<T> {
-      private final AvroCoder<T> coder;
-      private DataFileWriter<T> dataFileWriter;
-
-      public AvroWriter(FileBasedWriteOperation<T> writeOperation, 
AvroCoder<T> coder) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.BINARY;
-        this.coder = coder;
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
-        dataFileWriter = new DataFileWriter<>(coder.createDatumWriter());
-        dataFileWriter.create(coder.getSchema(), 
Channels.newOutputStream(channel));
-      }
-
-      @Override
-      public void write(T value) throws Exception {
-        dataFileWriter.append(value);
-      }
-
-      @Override
-      protected void writeFooter() throws Exception {
-        dataFileWriter.close();
-      }
+    AvroSink<T> writer = new AvroSink<>(transform.filenamePrefix, 
transform.shardTemplate,
+        transform.filenameSuffix, numShards,
+        WindowedValue.getValueOnlyCoder(AvroCoder.of(transform.type, 
transform.schema)));
+    try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) {
+      for (WindowedValue<T> elem : elems) {
+        sink.add(elem);
+      }
+    } catch (IOException exn) {
+      throw new RuntimeException(
+          "unable to write to output file \"" + transform.filenamePrefix + 
"\"", exn);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 5a57f7f..f20caa3 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -32,7 +32,6 @@ import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
 import com.google.cloud.dataflow.sdk.PipelineResult.State;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -49,10 +48,8 @@ import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
 import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
 import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink;
 import com.google.cloud.dataflow.sdk.io.PubsubIO;
 import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
 import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.io.Write;
@@ -63,8 +60,6 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
 import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification;
-import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
 import com.google.cloud.dataflow.sdk.runners.dataflow.AssignWindows;
 import 
com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
 import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
@@ -88,7 +83,6 @@ import 
com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
 import com.google.cloud.dataflow.sdk.transforms.WithKeys;
 import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
@@ -337,8 +331,6 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       ImmutableMap.Builder<Class<?>, Class<?>> builder = 
ImmutableMap.<Class<?>, Class<?>>builder();
       builder.put(Read.Unbounded.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
-      builder.put(Write.Bound.class, BatchWrite.class);
-      builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
       if (options.getExperiments() == null
           || !options.getExperiments().contains("disable_ism_side_input")) {
         builder.put(View.AsMap.class, BatchViewAsMap.class);
@@ -1927,177 +1919,6 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
   }
 
   /**
-   * A {@link PTransform} that uses shuffle to create a fusion break. This 
allows pushing
-   * parallelism limits such as sharding controls further down the pipeline.
-   */
-  private static class ReshardForWrite<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      return input
-          // TODO: This would need to be adapted to write per-window shards.
-          .apply(Window.<T>into(new GlobalWindows())
-                       .triggering(DefaultTrigger.of())
-                       .discardingFiredPanes())
-          .apply("RandomKey", ParDo.of(
-              new DoFn<T, KV<Long, T>>() {
-                transient long counter, step;
-                @Override
-                public void startBundle(Context c) {
-                  counter = (long) (Math.random() * Long.MAX_VALUE);
-                  step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
-                }
-                @Override
-                public void processElement(ProcessContext c) {
-                  counter += step;
-                  c.output(KV.of(counter, c.element()));
-                }
-              }))
-          .apply(GroupByKey.<Long, T>create())
-          .apply("Ungroup", ParDo.of(
-              new DoFn<KV<Long, Iterable<T>>, T>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  for (T item : c.element().getValue()) {
-                    c.output(item);
-                  }
-                }
-              }));
-    }
-  }
-
-  /**
-   * Specialized implementation which overrides
-   * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} to 
provide Google
-   * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
-   */
-  private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> 
{
-    private final DataflowPipelineRunner runner;
-    private final Write.Bound<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowPipelineRunner#apply()
-    public BatchWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) 
{
-      this.runner = runner;
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      if (transform.getSink() instanceof FileBasedSink) {
-        FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
-        PathValidator validator = runner.options.getPathValidator();
-        
validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
-      }
-      return transform.apply(input);
-    }
-  }
-
-  /**
-   * Specialized implementation which overrides
-   * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound 
AvroIO.Write.Bound} with
-   * a native sink instead of a custom sink as workaround until custom sinks
-   * have support for sharding controls.
-   */
-  private static class BatchAvroIOWrite<T> extends PTransform<PCollection<T>, 
PDone> {
-    private final AvroIO.Write.Bound<T> transform;
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowPipelineRunner#apply()
-    public BatchAvroIOWrite(DataflowPipelineRunner runner, 
AvroIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      if (transform.getNumShards() > 0) {
-        return input
-            .apply(new ReshardForWrite<T>())
-            .apply(new BatchAvroIONativeWrite<>(transform));
-      } else {
-        return transform.apply(input);
-      }
-    }
-  }
-
-  /**
-   * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} 
as a way
-   * to provide the native definition of the Avro sink.
-   */
-  private static class BatchAvroIONativeWrite<T> extends 
PTransform<PCollection<T>, PDone> {
-    private final AvroIO.Write.Bound<T> transform;
-    public BatchAvroIONativeWrite(AvroIO.Write.Bound<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      return PDone.in(input.getPipeline());
-    }
-
-    static {
-      DataflowPipelineTranslator.registerTransformTranslator(
-          BatchAvroIONativeWrite.class, new 
BatchAvroIONativeWriteTranslator());
-    }
-  }
-
-  /**
-   * AvroIO.Write.Bound support code for the Dataflow backend when applying 
parallelism limits
-   * through user requested sharding limits.
-   */
-  private static class BatchAvroIONativeWriteTranslator
-      implements TransformTranslator<BatchAvroIONativeWrite<?>> {
-    @SuppressWarnings("unchecked")
-    @Override
-    public void translate(@SuppressWarnings("rawtypes") BatchAvroIONativeWrite 
transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, transform.transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        BatchAvroIONativeWrite<T> transform,
-        AvroIO.Write.Bound<T> originalTransform,
-        TranslationContext context) {
-      // Note that the original transform can not be used during add step/add 
input
-      // and is only passed in to get properties from it.
-
-      checkState(originalTransform.getNumShards() > 0,
-          "Native AvroSink is expected to only be used when sharding controls 
are required.");
-
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-
-      // TODO: drop this check when server supports alternative templates.
-      switch (originalTransform.getShardTemplate()) {
-        case ShardNameTemplate.INDEX_OF_MAX:
-          break;  // supported by server
-        case "":
-          // Empty shard template allowed - forces single output.
-          Preconditions.checkArgument(originalTransform.getNumShards() <= 1,
-              "Num shards must be <= 1 when using an empty sharding template");
-          break;
-        default:
-          throw new UnsupportedOperationException("Shard template "
-              + originalTransform.getShardTemplate()
-              + " not yet supported by Dataflow service");
-      }
-
-      context.addInput(PropertyNames.FORMAT, "avro");
-      context.addInput(PropertyNames.FILENAME_PREFIX, 
originalTransform.getFilenamePrefix());
-      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, 
originalTransform.getShardTemplate());
-      context.addInput(PropertyNames.FILENAME_SUFFIX, 
originalTransform.getFilenameSuffix());
-      context.addInput(PropertyNames.VALIDATE_SINK, 
originalTransform.needsValidation());
-
-      context.addInput(PropertyNames.NUM_SHARDS, (long) 
originalTransform.getNumShards());
-
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(
-              AvroCoder.of(originalTransform.getType(), 
originalTransform.getSchema())));
-    }
-  }
-
-  /**
    * Specialized (non-)implementation for
    * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound}
    * for the Dataflow runner in streaming mode.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index 885260e..f7217f7 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.BigQueryIO;
 import com.google.cloud.dataflow.sdk.io.PubsubIO;
 import com.google.cloud.dataflow.sdk.io.Read;
@@ -48,6 +49,7 @@ import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
+import com.google.cloud.dataflow.sdk.runners.dataflow.AvroIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
@@ -1027,6 +1029,9 @@ public class DataflowPipelineTranslator {
     // IO Translation.
 
     registerTransformTranslator(
+        AvroIO.Write.Bound.class, new AvroIOTranslator.WriteTranslator());
+
+    registerTransformTranslator(
         BigQueryIO.Read.Bound.class, new 
BigQueryIOTranslator.ReadTranslator());
     registerTransformTranslator(
         BigQueryIO.Write.Bound.class, new 
BigQueryIOTranslator.WriteTranslator());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
new file mode 100644
index 0000000..b114021
--- /dev/null
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.runners.dataflow;
+
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
+import com.google.cloud.dataflow.sdk.io.AvroIO;
+import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
+import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
+import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
+import com.google.cloud.dataflow.sdk.util.PathValidator;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.common.base.Preconditions;
+
+/**
+ * Avro transform support code for the Dataflow backend.
+ */
+public class AvroIOTranslator {
+
+  /**
+   * Implements AvroIO Write translation for the Dataflow backend.
+   */
+  @SuppressWarnings("rawtypes")
+  public static class WriteTranslator implements 
TransformTranslator<AvroIO.Write.Bound> {
+
+    @Override
+    public void translate(
+        AvroIO.Write.Bound transform,
+        TranslationContext context) {
+      translateWriteHelper(transform, context);
+    }
+
+    private <T> void translateWriteHelper(
+        AvroIO.Write.Bound<T> transform,
+        TranslationContext context) {
+      PathValidator validator = 
context.getPipelineOptions().getPathValidator();
+      String filenamePrefix = validator.validateOutputFilePrefixSupported(
+          transform.getFilenamePrefix());
+      context.addStep(transform, "ParallelWrite");
+      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+
+      // TODO: drop this check when server supports alternative templates.
+      switch (transform.getShardTemplate()) {
+        case ShardNameTemplate.INDEX_OF_MAX:
+          break;  // supported by server
+        case "":
+          // Empty shard template allowed - forces single output.
+          Preconditions.checkArgument(transform.getNumShards() <= 1,
+              "Num shards must be <= 1 when using an empty sharding template");
+          break;
+        default:
+          throw new UnsupportedOperationException("Shard template "
+              + transform.getShardTemplate()
+              + " not yet supported by Dataflow service");
+      }
+
+      context.addInput(PropertyNames.FORMAT, "avro");
+      context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix);
+      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, 
transform.getShardTemplate());
+      context.addInput(PropertyNames.FILENAME_SUFFIX, 
transform.getFilenameSuffix());
+      context.addInput(PropertyNames.VALIDATE_SINK, 
transform.needsValidation());
+
+      long numShards = transform.getNumShards();
+      if (numShards > 0) {
+        context.addInput(PropertyNames.NUM_SHARDS, numShards);
+      }
+
+      context.addEncodingInput(
+          WindowedValue.getValueOnlyCoder(
+              AvroCoder.of(transform.getType(), transform.getSchema())));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
index 927d3a3..6bb459d 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
@@ -148,102 +148,102 @@ public class AvroIOGeneratedClassTest {
   public void testReadFromGeneratedClass() throws Exception {
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .withSchema(AvroGeneratedUser.class),
-                "AvroIO.Read/Read.out", generateAvroObjects());
+                "AvroIO.Read/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class)
                            .from(avroFile.getPath()),
-                "AvroIO.Read/Read.out", generateAvroObjects());
+                "AvroIO.Read/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.named("MyRead")
                            .from(avroFile.getPath())
                            .withSchema(AvroGeneratedUser.class),
-                "MyRead/Read.out", generateAvroObjects());
+                "MyRead/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.named("MyRead")
                            .withSchema(AvroGeneratedUser.class)
                            .from(avroFile.getPath()),
-                "MyRead/Read.out", generateAvroObjects());
+                "MyRead/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .withSchema(AvroGeneratedUser.class)
                            .named("HerRead"),
-                "HerRead/Read.out", generateAvroObjects());
+                "HerRead/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .named("HerRead")
                            .withSchema(AvroGeneratedUser.class),
-                "HerRead/Read.out", generateAvroObjects());
+                "HerRead/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class)
                            .named("HerRead")
                            .from(avroFile.getPath()),
-                "HerRead/Read.out", generateAvroObjects());
+                "HerRead/Read(AvroSource).out", generateAvroObjects());
     runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class)
                            .from(avroFile.getPath())
                            .named("HerRead"),
-                "HerRead/Read.out", generateAvroObjects());
+                "HerRead/Read(AvroSource).out", generateAvroObjects());
   }
 
   @Test
   public void testReadFromSchema() throws Exception {
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .withSchema(schema),
-                "AvroIO.Read/Read.out", generateAvroGenericRecords());
+                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
     runTestRead(AvroIO.Read.withSchema(schema)
                            .from(avroFile.getPath()),
-                "AvroIO.Read/Read.out", generateAvroGenericRecords());
+                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
     runTestRead(AvroIO.Read.named("MyRead")
                            .from(avroFile.getPath())
                            .withSchema(schema),
-                "MyRead/Read.out", generateAvroGenericRecords());
+                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.named("MyRead")
                            .withSchema(schema)
                            .from(avroFile.getPath()),
-                "MyRead/Read.out", generateAvroGenericRecords());
+                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .withSchema(schema)
                            .named("HerRead"),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .named("HerRead")
                            .withSchema(schema),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.withSchema(schema)
                            .named("HerRead")
                            .from(avroFile.getPath()),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.withSchema(schema)
                            .from(avroFile.getPath())
                            .named("HerRead"),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
   }
 
   @Test
   public void testReadFromSchemaString() throws Exception {
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .withSchema(schemaString),
-                "AvroIO.Read/Read.out", generateAvroGenericRecords());
+                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
     runTestRead(AvroIO.Read.withSchema(schemaString)
                            .from(avroFile.getPath()),
-                "AvroIO.Read/Read.out", generateAvroGenericRecords());
+                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
     runTestRead(AvroIO.Read.named("MyRead")
                            .from(avroFile.getPath())
                            .withSchema(schemaString),
-                "MyRead/Read.out", generateAvroGenericRecords());
+                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.named("MyRead")
                            .withSchema(schemaString)
                            .from(avroFile.getPath()),
-                "MyRead/Read.out", generateAvroGenericRecords());
+                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .withSchema(schemaString)
                            .named("HerRead"),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.from(avroFile.getPath())
                            .named("HerRead")
                            .withSchema(schemaString),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.withSchema(schemaString)
                            .named("HerRead")
                            .from(avroFile.getPath()),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
     runTestRead(AvroIO.Read.withSchema(schemaString)
                            .from(avroFile.getPath())
                            .named("HerRead"),
-                "HerRead/Read.out", generateAvroGenericRecords());
+                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
   }
 
   <T> void runTestWrite(AvroIO.Write.Bound<T> write, String expectedName)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
index 207b2ba..30c578b 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
@@ -16,28 +16,19 @@
 
 package com.google.cloud.dataflow.sdk.io;
 
-import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO.AvroSink;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
 import com.google.cloud.dataflow.sdk.runners.DirectPipeline;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
 
-import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.junit.Rule;
@@ -47,10 +38,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.File;
-import java.nio.channels.FileChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -202,38 +189,6 @@ public class AvroIOTest {
     p.run();
   }
 
-  @Test
-  public void testAvroSinkWrite() throws Exception {
-    String[] expectedElements = new String[]{ "first", "second", "third" };
-    PipelineOptions options = PipelineOptionsFactory.create();
-    AvroCoder<String> coder = AvroCoder.of(String.class);
-    File tmpFile = tmpFolder.newFile();
-    AvroSink<String> avroSink = new AvroSink<>(
-        "prefix", "suffix", ShardNameTemplate.INDEX_OF_MAX, coder);
-    FileBasedWriteOperation<String> writeOperation = 
avroSink.createWriteOperation(options);
-    FileBasedWriter<String> writer = writeOperation.createWriter(options);
-
-    WritableByteChannel channel = FileChannel.open(tmpFile.toPath(), 
StandardOpenOption.WRITE);
-    writer.prepareWrite(channel);
-    writer.writeHeader();
-    for (String element : expectedElements) {
-      writer.write(element);
-      // We expect the channel to remain open
-      assertTrue(channel.isOpen());
-    }
-
-    writer.close();
-    // Ensure that we properly close the channel
-    assertFalse(channel.isOpen());
-
-    // Validate that the data written matches the expected elements in the 
expected order
-    try (DataFileReader<String> reader = new DataFileReader<>(tmpFile, 
coder.createDatumReader())) {
-      List<String> actualElements = new ArrayList<>();
-      Iterators.addAll(actualElements, reader);
-      assertThat(actualElements, contains(expectedElements));
-    }
-  }
-
   // TODO: for Write only, test withSuffix, withNumShards,
   // withShardNameTemplate and withoutSharding.
 }


Reply via email to