Resubmit "Migrate AvroIO.Write to a custom sink"

Note for user requested sharding limits to be supported,
each pipeline runner must support applying those sharding limits.

Google Cloud Dataflow supports sharding limits.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115402880


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01a0da02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01a0da02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01a0da02

Branch: refs/heads/master
Commit: 01a0da02daed5f1609237ae85c82fd056ea76339
Parents: 2e89a4b
Author: dhalperi <[email protected]>
Authored: Tue Feb 23 17:35:40 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 .../google/cloud/dataflow/sdk/io/AvroIO.java    | 223 ++++++++++---------
 .../sdk/runners/DataflowPipelineRunner.java     | 189 ++++++++++++++++
 .../sdk/runners/DataflowPipelineTranslator.java |   5 -
 .../sdk/runners/dataflow/AvroIOTranslator.java  |  87 --------
 .../sdk/io/AvroIOGeneratedClassTest.java        | 186 ++++++++--------
 .../cloud/dataflow/sdk/io/AvroIOTest.java       |  34 ++-
 6 files changed, 433 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
index 9ee7e6b..f016b5b 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java
@@ -22,24 +22,25 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.worker.AvroSink;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
+import com.google.cloud.dataflow.sdk.util.MimeTypes;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PDone;
 import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
 
 import java.io.IOException;
-import java.util.List;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
@@ -317,7 +318,7 @@ public class AvroIO {
                 : com.google.cloud.dataflow.sdk.io.Read.from(
                     AvroSource.from(filepattern).withSchema(type));
 
-        PCollection<T> pcol = input.getPipeline().apply(read);
+        PCollection<T> pcol = input.getPipeline().apply("Read", read);
         // Honor the default output coder that would have been used by this 
PTransform.
         pcol.setCoder(getDefaultOutputCoder());
         return pcol;
@@ -473,8 +474,6 @@ public class AvroIO {
       final int numShards;
       /** Shard template string. */
       final String shardTemplate;
-      /** Insert a shuffle before writing to decouple parallelism when 
numShards != 0. */
-      final boolean forceReshard;
       /** The class type of the records. */
       final Class<T> type;
       /** The schema of the output file. */
@@ -484,18 +483,23 @@ public class AvroIO {
       final boolean validate;
 
       Bound(Class<T> type) {
-        this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, true, type, 
null, true);
+        this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, 
true);
       }
 
-      Bound(String name, String filenamePrefix, String filenameSuffix, int 
numShards,
-          String shardTemplate, boolean forceReshard, Class<T> type, Schema 
schema,
+      Bound(
+          String name,
+          String filenamePrefix,
+          String filenameSuffix,
+          int numShards,
+          String shardTemplate,
+          Class<T> type,
+          Schema schema,
           boolean validate) {
         super(name);
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
         this.numShards = numShards;
         this.shardTemplate = shardTemplate;
-        this.forceReshard = forceReshard;
         this.type = type;
         this.schema = schema;
         this.validate = validate;
@@ -509,8 +513,7 @@ public class AvroIO {
        */
       public Bound<T> named(String name) {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
-            type, schema, validate);
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type, schema, validate);
       }
 
       /**
@@ -525,8 +528,7 @@ public class AvroIO {
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
-            type, schema, validate);
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type, schema, validate);
       }
 
       /**
@@ -540,8 +542,7 @@ public class AvroIO {
       public Bound<T> withSuffix(String filenameSuffix) {
         validateOutputComponent(filenameSuffix);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard, type,
-            schema, validate);
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type, schema, validate);
       }
 
       /**
@@ -559,32 +560,9 @@ public class AvroIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withNumShards(int numShards) {
-        return withNumShards(numShards, forceReshard);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. If forceReshard is true, the output
-       * will be shuffled to obtain the desired sharding. If it is false,
-       * data will not be reshuffled, but parallelism of preceeding stages
-       * may be constrained. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @param forceReshard whether to force a reshard to obtain the desired 
sharding.
-       * @see ShardNameTemplate
-       */
-      private Bound<T> withNumShards(int numShards, boolean forceReshard) {
         Preconditions.checkArgument(numShards >= 0);
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
-            type, schema, validate);
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type, schema, validate);
       }
 
       /**
@@ -597,8 +575,7 @@ public class AvroIO {
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
-            type, schema, validate);
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type, schema, validate);
       }
 
       /**
@@ -611,23 +588,7 @@ public class AvroIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutSharding() {
-        return withoutSharding(forceReshard);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       *
-       * @param forceReshard whether to force a reshard to obtain the desired 
sharding.
-       */
-      private Bound<T> withoutSharding(boolean forceReshard) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", 
forceReshard,
-            type, schema, validate);
+        return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", type, 
schema, validate);
       }
 
       /**
@@ -640,8 +601,15 @@ public class AvroIO {
        * @param <X> the type of the elements of the input PCollection
        */
       public <X> Bound<X> withSchema(Class<X> type) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, 
shardTemplate,
-            forceReshard, type, ReflectData.get().getSchema(type), validate);
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            type,
+            ReflectData.get().getSchema(type),
+            validate);
       }
 
       /**
@@ -652,8 +620,15 @@ public class AvroIO {
        * <p>Does not modify this object.
        */
       public Bound<GenericRecord> withSchema(Schema schema) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, 
shardTemplate,
-            forceReshard, GenericRecord.class, schema, validate);
+        return new Bound<>(
+            name,
+            filenamePrefix,
+            filenameSuffix,
+            numShards,
+            shardTemplate,
+            GenericRecord.class,
+            schema,
+            validate);
       }
 
       /**
@@ -679,8 +654,7 @@ public class AvroIO {
        */
       public Bound<T> withoutValidation() {
         return new Bound<>(
-            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
forceReshard,
-            type, schema, false);
+            name, filenamePrefix, filenameSuffix, numShards, shardTemplate, 
type, schema, false);
       }
 
       @Override
@@ -693,14 +667,14 @@ public class AvroIO {
           throw new IllegalStateException("need to set the schema of an 
AvroIO.Write transform");
         }
 
-        if (numShards > 0 && forceReshard) {
-          // Reshard and re-apply a version of this write without resharding.
-          return input
-              .apply(new FileBasedSink.ReshardForWrite<T>())
-              .apply(withNumShards(numShards, false));
-        } else {
-          return PDone.in(input.getPipeline());
-        }
+        // Note that custom sinks currently do not expose sharding controls.
+        // Thus pipeline runner writers need to individually add support 
internally to
+        // apply user requested sharding limits.
+        return input.apply(
+            "Write",
+            com.google.cloud.dataflow.sdk.io.Write.to(
+                new AvroSink<>(
+                    filenamePrefix, filenameSuffix, shardTemplate, 
AvroCoder.of(type, schema))));
       }
 
       /**
@@ -742,21 +716,6 @@ public class AvroIO {
       public boolean needsValidation() {
         return validate;
       }
-
-      static {
-        @SuppressWarnings("rawtypes")
-        DirectPipelineRunner.TransformEvaluator<Bound> transformEvaluator =
-            new DirectPipelineRunner.TransformEvaluator<Bound>() {
-          @Override
-          @SuppressWarnings("unchecked")
-          public void evaluate(
-              Bound transform, DirectPipelineRunner.EvaluationContext context) 
{
-            evaluateWriteHelper(transform, context);
-          }
-        };
-        DirectPipelineRunner.registerDefaultTransformEvaluator(
-            Bound.class, transformEvaluator);
-      }
     }
 
     /** Disallow construction of utility class. */
@@ -779,25 +738,73 @@ public class AvroIO {
   /** Disallow construction of utility class. */
   private AvroIO() {}
 
-  private static <T> void evaluateWriteHelper(
-      Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext 
context) {
-    List<WindowedValue<T>> elems =
-        context.getPCollectionWindowedValues(context.getInput(transform));
-    int numShards = transform.numShards;
-    if (numShards < 1) {
-      // System gets to choose. For direct mode, choose 1.
-      numShards = 1;
+  /**
+   * A {@link FileBasedSink} for Avro files.
+   */
+  @VisibleForTesting
+  static class AvroSink<T> extends FileBasedSink<T> {
+    private final AvroCoder<T> coder;
+
+    @VisibleForTesting
+    AvroSink(
+        String baseOutputFilename, String extension, String fileNameTemplate, 
AvroCoder<T> coder) {
+      super(baseOutputFilename, extension, fileNameTemplate);
+      this.coder = coder;
+    }
+
+    @Override
+    public FileBasedSink.FileBasedWriteOperation<T> 
createWriteOperation(PipelineOptions options) {
+      return new AvroWriteOperation<>(this, coder);
+    }
+
+    /**
+     * A {@link 
com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation
+     * FileBasedWriteOperation} for Avro files.
+     */
+    private static class AvroWriteOperation<T> extends 
FileBasedWriteOperation<T> {
+      private final AvroCoder<T> coder;
+
+      private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) {
+        super(sink);
+        this.coder = coder;
+      }
+
+      @Override
+      public FileBasedWriter<T> createWriter(PipelineOptions options) throws 
Exception {
+        return new AvroWriter<>(this, coder);
+      }
     }
-    AvroSink<T> writer = new AvroSink<>(transform.filenamePrefix, 
transform.shardTemplate,
-        transform.filenameSuffix, numShards,
-        WindowedValue.getValueOnlyCoder(AvroCoder.of(transform.type, 
transform.schema)));
-    try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) {
-      for (WindowedValue<T> elem : elems) {
-        sink.add(elem);
-      }
-    } catch (IOException exn) {
-      throw new RuntimeException(
-          "unable to write to output file \"" + transform.filenamePrefix + 
"\"", exn);
+
+    /**
+     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter 
FileBasedWriter}
+     * for Avro files.
+     */
+    private static class AvroWriter<T> extends FileBasedWriter<T> {
+      private final AvroCoder<T> coder;
+      private DataFileWriter<T> dataFileWriter;
+
+      public AvroWriter(FileBasedWriteOperation<T> writeOperation, 
AvroCoder<T> coder) {
+        super(writeOperation);
+        this.mimeType = MimeTypes.BINARY;
+        this.coder = coder;
+      }
+
+      @SuppressWarnings("deprecation") // uses internal test functionality.
+      @Override
+      protected void prepareWrite(WritableByteChannel channel) throws 
Exception {
+        dataFileWriter = new DataFileWriter<>(coder.createDatumWriter());
+        dataFileWriter.create(coder.getSchema(), 
Channels.newOutputStream(channel));
+      }
+
+      @Override
+      public void write(T value) throws Exception {
+        dataFileWriter.append(value);
+      }
+
+      @Override
+      protected void writeFooter() throws Exception {
+        dataFileWriter.flush();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index f20caa3..ac0dcea 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -32,6 +32,7 @@ import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
 import com.google.cloud.dataflow.sdk.PipelineResult.State;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -48,8 +49,10 @@ import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
 import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
 import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.BigQueryIO;
+import com.google.cloud.dataflow.sdk.io.FileBasedSink;
 import com.google.cloud.dataflow.sdk.io.PubsubIO;
 import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
 import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.io.Write;
@@ -60,6 +63,8 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
 import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification;
+import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
+import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
 import com.google.cloud.dataflow.sdk.runners.dataflow.AssignWindows;
 import 
com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms;
 import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
@@ -83,6 +88,7 @@ import 
com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
 import com.google.cloud.dataflow.sdk.transforms.WithKeys;
 import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
@@ -331,6 +337,8 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
       ImmutableMap.Builder<Class<?>, Class<?>> builder = 
ImmutableMap.<Class<?>, Class<?>>builder();
       builder.put(Read.Unbounded.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
+      builder.put(Write.Bound.class, BatchWrite.class);
+      builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class);
       if (options.getExperiments() == null
           || !options.getExperiments().contains("disable_ism_side_input")) {
         builder.put(View.AsMap.class, BatchViewAsMap.class);
@@ -1919,6 +1927,187 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
   }
 
   /**
+   * A {@link PTransform} that uses shuffle to create a fusion break. This 
allows pushing
+   * parallelism limits such as sharding controls further down the pipeline.
+   */
+  private static class ReshardForWrite<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      return input
+          // TODO: This would need to be adapted to write per-window shards.
+          .apply(
+              Window.<T>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes())
+          .apply(
+              "RandomKey",
+              ParDo.of(
+                  new DoFn<T, KV<Long, T>>() {
+                    transient long counter, step;
+
+                    @Override
+                    public void startBundle(Context c) {
+                      counter = (long) (Math.random() * Long.MAX_VALUE);
+                      step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void processElement(ProcessContext c) {
+                      counter += step;
+                      c.output(KV.of(counter, c.element()));
+                    }
+                  }))
+          .apply(GroupByKey.<Long, T>create())
+          .apply(
+              "Ungroup",
+              ParDo.of(
+                  new DoFn<KV<Long, Iterable<T>>, T>() {
+                    @Override
+                    public void processElement(ProcessContext c) {
+                      for (T item : c.element().getValue()) {
+                        c.output(item);
+                      }
+                    }
+                  }));
+    }
+  }
+
+  /**
+   * Specialized implementation which overrides
+   * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} to 
provide Google
+   * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
+   */
+  private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> 
{
+    private final DataflowPipelineRunner runner;
+    private final Write.Bound<T> transform;
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in 
DataflowPipelineRunner#apply()
+    public BatchWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) 
{
+      this.runner = runner;
+      this.transform = transform;
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      if (transform.getSink() instanceof FileBasedSink) {
+        FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
+        PathValidator validator = runner.options.getPathValidator();
+        
validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
+      }
+      return transform.apply(input);
+    }
+  }
+
+  /**
+   * Specialized implementation which overrides
+   * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound 
AvroIO.Write.Bound} with
+   * a native sink instead of a custom sink as workaround until custom sinks
+   * have support for sharding controls.
+   */
+  private static class BatchAvroIOWrite<T> extends PTransform<PCollection<T>, 
PDone> {
+    private final AvroIO.Write.Bound<T> transform;
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in 
DataflowPipelineRunner#apply()
+    public BatchAvroIOWrite(DataflowPipelineRunner runner, 
AvroIO.Write.Bound<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      if (transform.getNumShards() > 0) {
+        return input.apply(new ReshardForWrite<T>()).apply(new 
BatchAvroIONativeWrite<>(transform));
+      } else {
+        return transform.apply(input);
+      }
+    }
+  }
+
+  /**
+   * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} 
as a way
+   * to provide the native definition of the Avro sink.
+   */
+  private static class BatchAvroIONativeWrite<T> extends 
PTransform<PCollection<T>, PDone> {
+    private final AvroIO.Write.Bound<T> transform;
+
+    public BatchAvroIONativeWrite(AvroIO.Write.Bound<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      return PDone.in(input.getPipeline());
+    }
+
+    static {
+      DataflowPipelineTranslator.registerTransformTranslator(
+          BatchAvroIONativeWrite.class, new 
BatchAvroIONativeWriteTranslator());
+    }
+  }
+
+  /**
+   * AvroIO.Write.Bound support code for the Dataflow backend when applying 
parallelism limits
+   * through user requested sharding limits.
+   */
+  private static class BatchAvroIONativeWriteTranslator
+      implements TransformTranslator<BatchAvroIONativeWrite<?>> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void translate(
+        @SuppressWarnings("rawtypes") BatchAvroIONativeWrite transform,
+        TranslationContext context) {
+      translateWriteHelper(transform, transform.transform, context);
+    }
+
+    private <T> void translateWriteHelper(
+        BatchAvroIONativeWrite<T> transform,
+        AvroIO.Write.Bound<T> originalTransform,
+        TranslationContext context) {
+      // Note that the original transform can not be used during add step/add 
input
+      // and is only passed in to get properties from it.
+
+      checkState(
+          originalTransform.getNumShards() > 0,
+          "Native AvroSink is expected to only be used when sharding controls 
are required.");
+
+      context.addStep(transform, "ParallelWrite");
+      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
+
+      // TODO: drop this check when server supports alternative templates.
+      switch (originalTransform.getShardTemplate()) {
+        case ShardNameTemplate.INDEX_OF_MAX:
+          break; // supported by server
+        case "":
+          // Empty shard template allowed - forces single output.
+          Preconditions.checkArgument(
+              originalTransform.getNumShards() <= 1,
+              "Num shards must be <= 1 when using an empty sharding template");
+          break;
+        default:
+          throw new UnsupportedOperationException(
+              "Shard template "
+                  + originalTransform.getShardTemplate()
+                  + " not yet supported by Dataflow service");
+      }
+
+      context.addInput(PropertyNames.FORMAT, "avro");
+      context.addInput(PropertyNames.FILENAME_PREFIX, 
originalTransform.getFilenamePrefix());
+      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, 
originalTransform.getShardTemplate());
+      context.addInput(PropertyNames.FILENAME_SUFFIX, 
originalTransform.getFilenameSuffix());
+      context.addInput(PropertyNames.VALIDATE_SINK, 
originalTransform.needsValidation());
+
+      context.addInput(PropertyNames.NUM_SHARDS, (long) 
originalTransform.getNumShards());
+
+      context.addEncodingInput(
+          WindowedValue.getValueOnlyCoder(
+              AvroCoder.of(originalTransform.getType(), 
originalTransform.getSchema())));
+    }
+  }
+
+  /**
    * Specialized (non-)implementation for
    * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound}
    * for the Dataflow runner in streaming mode.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index f7217f7..885260e 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -41,7 +41,6 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.BigQueryIO;
 import com.google.cloud.dataflow.sdk.io.PubsubIO;
 import com.google.cloud.dataflow.sdk.io.Read;
@@ -49,7 +48,6 @@ import com.google.cloud.dataflow.sdk.io.TextIO;
 import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.StreamingOptions;
 import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly;
-import com.google.cloud.dataflow.sdk.runners.dataflow.AvroIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator;
 import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator;
@@ -1029,9 +1027,6 @@ public class DataflowPipelineTranslator {
     // IO Translation.
 
     registerTransformTranslator(
-        AvroIO.Write.Bound.class, new AvroIOTranslator.WriteTranslator());
-
-    registerTransformTranslator(
         BigQueryIO.Read.Bound.class, new 
BigQueryIOTranslator.ReadTranslator());
     registerTransformTranslator(
         BigQueryIO.Write.Bound.class, new 
BigQueryIOTranslator.WriteTranslator());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
deleted file mode 100644
index b114021..0000000
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy 
of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.runners.dataflow;
-
-import com.google.cloud.dataflow.sdk.coders.AvroCoder;
-import com.google.cloud.dataflow.sdk.io.AvroIO;
-import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
-import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator;
-import 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
-import com.google.cloud.dataflow.sdk.util.PathValidator;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.common.base.Preconditions;
-
-/**
- * Avro transform support code for the Dataflow backend.
- */
-public class AvroIOTranslator {
-
-  /**
-   * Implements AvroIO Write translation for the Dataflow backend.
-   */
-  @SuppressWarnings("rawtypes")
-  public static class WriteTranslator implements 
TransformTranslator<AvroIO.Write.Bound> {
-
-    @Override
-    public void translate(
-        AvroIO.Write.Bound transform,
-        TranslationContext context) {
-      translateWriteHelper(transform, context);
-    }
-
-    private <T> void translateWriteHelper(
-        AvroIO.Write.Bound<T> transform,
-        TranslationContext context) {
-      PathValidator validator = 
context.getPipelineOptions().getPathValidator();
-      String filenamePrefix = validator.validateOutputFilePrefixSupported(
-          transform.getFilenamePrefix());
-      context.addStep(transform, "ParallelWrite");
-      context.addInput(PropertyNames.PARALLEL_INPUT, 
context.getInput(transform));
-
-      // TODO: drop this check when server supports alternative templates.
-      switch (transform.getShardTemplate()) {
-        case ShardNameTemplate.INDEX_OF_MAX:
-          break;  // supported by server
-        case "":
-          // Empty shard template allowed - forces single output.
-          Preconditions.checkArgument(transform.getNumShards() <= 1,
-              "Num shards must be <= 1 when using an empty sharding template");
-          break;
-        default:
-          throw new UnsupportedOperationException("Shard template "
-              + transform.getShardTemplate()
-              + " not yet supported by Dataflow service");
-      }
-
-      context.addInput(PropertyNames.FORMAT, "avro");
-      context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix);
-      context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, 
transform.getShardTemplate());
-      context.addInput(PropertyNames.FILENAME_SUFFIX, 
transform.getFilenameSuffix());
-      context.addInput(PropertyNames.VALIDATE_SINK, 
transform.needsValidation());
-
-      long numShards = transform.getNumShards();
-      if (numShards > 0) {
-        context.addInput(PropertyNames.NUM_SHARDS, numShards);
-      }
-
-      context.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(
-              AvroCoder.of(transform.getType(), transform.getSchema())));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
index 6bb459d..6a7679f 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java
@@ -146,104 +146,110 @@ public class AvroIOGeneratedClassTest {
 
   @Test
   public void testReadFromGeneratedClass() throws Exception {
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .withSchema(AvroGeneratedUser.class),
-                "AvroIO.Read/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class)
-                           .from(avroFile.getPath()),
-                "AvroIO.Read/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.named("MyRead")
-                           .from(avroFile.getPath())
-                           .withSchema(AvroGeneratedUser.class),
-                "MyRead/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.named("MyRead")
-                           .withSchema(AvroGeneratedUser.class)
-                           .from(avroFile.getPath()),
-                "MyRead/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .withSchema(AvroGeneratedUser.class)
-                           .named("HerRead"),
-                "HerRead/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .named("HerRead")
-                           .withSchema(AvroGeneratedUser.class),
-                "HerRead/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class)
-                           .named("HerRead")
-                           .from(avroFile.getPath()),
-                "HerRead/Read(AvroSource).out", generateAvroObjects());
-    runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class)
-                           .from(avroFile.getPath())
-                           .named("HerRead"),
-                "HerRead/Read(AvroSource).out", generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+        "AvroIO.Read/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
+        "AvroIO.Read/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+        "MyRead/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
+        "MyRead/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"),
+        "HerRead/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class),
+        "HerRead/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()),
+        "HerRead/Read.out",
+        generateAvroObjects());
+    runTestRead(
+        
AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"),
+        "HerRead/Read.out",
+        generateAvroObjects());
   }
 
   @Test
   public void testReadFromSchema() throws Exception {
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .withSchema(schema),
-                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.withSchema(schema)
-                           .from(avroFile.getPath()),
-                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.named("MyRead")
-                           .from(avroFile.getPath())
-                           .withSchema(schema),
-                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.named("MyRead")
-                           .withSchema(schema)
-                           .from(avroFile.getPath()),
-                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .withSchema(schema)
-                           .named("HerRead"),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .named("HerRead")
-                           .withSchema(schema),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.withSchema(schema)
-                           .named("HerRead")
-                           .from(avroFile.getPath()),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.withSchema(schema)
-                           .from(avroFile.getPath())
-                           .named("HerRead"),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
+    runTestRead(
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
+        "AvroIO.Read/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
+        "AvroIO.Read/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema),
+        "MyRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()),
+        "MyRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
   }
 
   @Test
   public void testReadFromSchemaString() throws Exception {
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .withSchema(schemaString),
-                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.withSchema(schemaString)
-                           .from(avroFile.getPath()),
-                "AvroIO.Read/Read(AvroSource).out", 
generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.named("MyRead")
-                           .from(avroFile.getPath())
-                           .withSchema(schemaString),
-                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.named("MyRead")
-                           .withSchema(schemaString)
-                           .from(avroFile.getPath()),
-                "MyRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .withSchema(schemaString)
-                           .named("HerRead"),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.from(avroFile.getPath())
-                           .named("HerRead")
-                           .withSchema(schemaString),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.withSchema(schemaString)
-                           .named("HerRead")
-                           .from(avroFile.getPath()),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
-    runTestRead(AvroIO.Read.withSchema(schemaString)
-                           .from(avroFile.getPath())
-                           .named("HerRead"),
-                "HerRead/Read(AvroSource).out", generateAvroGenericRecords());
+    runTestRead(
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
+        "AvroIO.Read/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
+        "AvroIO.Read/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString),
+        "MyRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()),
+        "MyRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
+    runTestRead(
+        
AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"),
+        "HerRead/Read.out",
+        generateAvroGenericRecords());
   }
 
   <T> void runTestWrite(AvroIO.Write.Bound<T> write, String expectedName)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
index 30c578b..2258a91 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
@@ -16,19 +16,25 @@
 
 package com.google.cloud.dataflow.sdk.io;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
 import com.google.cloud.dataflow.sdk.runners.DirectPipeline;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterators;
 
+import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.junit.Rule;
@@ -38,6 +44,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -104,7 +111,7 @@ public class AvroIOTest {
   }
 
   @Test
-  public void testAvroIOWriteAndRead() throws Throwable {
+  public void testAvroIOWriteAndReadASingleFile() throws Throwable {
     DirectPipeline p = DirectPipeline.createForTest();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
@@ -189,6 +196,31 @@ public class AvroIOTest {
     p.run();
   }
 
+  @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for 
tests.
+  @Test
+  public void testAvroSinkWrite() throws Exception {
+    String outputFilePrefix = new File(tmpFolder.getRoot(), 
"prefix").getAbsolutePath();
+    String[] expectedElements = new String[] {"first", "second", "third"};
+
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.<String>of(expectedElements))
+        .apply(AvroIO.Write.to(outputFilePrefix).withSchema(String.class));
+    p.run();
+
+    // Validate that the data written matches the expected elements in the 
expected order
+    String expectedName =
+        IOChannelUtils.constructName(
+            outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix 
*/, 0, 1);
+    File outputFile = new File(expectedName);
+    assertTrue("Expected output file " + expectedName, outputFile.exists());
+    try (DataFileReader<String> reader =
+            new DataFileReader<>(outputFile, 
AvroCoder.of(String.class).createDatumReader())) {
+      List<String> actualElements = new ArrayList<>();
+      Iterators.addAll(actualElements, reader);
+      assertThat(actualElements, containsInAnyOrder(expectedElements));
+    }
+  }
+
   // TODO: for Write only, test withSuffix, withNumShards,
   // withShardNameTemplate and withoutSharding.
 }


Reply via email to