Resubmit "Migrate AvroIO.Write to a custom sink" Note for user requested sharding limits to be supported, each pipeline runner must support applying those sharding limits.
Google Cloud Dataflow supports sharding limits. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115402880 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01a0da02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01a0da02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01a0da02 Branch: refs/heads/master Commit: 01a0da02daed5f1609237ae85c82fd056ea76339 Parents: 2e89a4b Author: dhalperi <[email protected]> Authored: Tue Feb 23 17:35:40 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:27 2016 -0800 ---------------------------------------------------------------------- .../google/cloud/dataflow/sdk/io/AvroIO.java | 223 ++++++++++--------- .../sdk/runners/DataflowPipelineRunner.java | 189 ++++++++++++++++ .../sdk/runners/DataflowPipelineTranslator.java | 5 - .../sdk/runners/dataflow/AvroIOTranslator.java | 87 -------- .../sdk/io/AvroIOGeneratedClassTest.java | 186 ++++++++-------- .../cloud/dataflow/sdk/io/AvroIOTest.java | 34 ++- 6 files changed, 433 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java index 9ee7e6b..f016b5b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java @@ -22,24 +22,25 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read.Bounded; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.runners.worker.AvroSink; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.common.worker.Sink; +import com.google.cloud.dataflow.sdk.util.MimeTypes; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import java.io.IOException; -import java.util.List; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -317,7 +318,7 @@ public class AvroIO { : com.google.cloud.dataflow.sdk.io.Read.from( AvroSource.from(filepattern).withSchema(type)); - PCollection<T> pcol = input.getPipeline().apply(read); + PCollection<T> pcol = input.getPipeline().apply("Read", read); // Honor the default output coder that would have been used by this PTransform. pcol.setCoder(getDefaultOutputCoder()); return pcol; @@ -473,8 +474,6 @@ public class AvroIO { final int numShards; /** Shard template string. */ final String shardTemplate; - /** Insert a shuffle before writing to decouple parallelism when numShards != 0. */ - final boolean forceReshard; /** The class type of the records. */ final Class<T> type; /** The schema of the output file. */ @@ -484,18 +483,23 @@ public class AvroIO { final boolean validate; Bound(Class<T> type) { - this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, true, type, null, true); + this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, true); } - Bound(String name, String filenamePrefix, String filenameSuffix, int numShards, - String shardTemplate, boolean forceReshard, Class<T> type, Schema schema, + Bound( + String name, + String filenamePrefix, + String filenameSuffix, + int numShards, + String shardTemplate, + Class<T> type, + Schema schema, boolean validate) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; this.numShards = numShards; this.shardTemplate = shardTemplate; - this.forceReshard = forceReshard; this.type = type; this.schema = schema; this.validate = validate; @@ -509,8 +513,7 @@ public class AvroIO { */ public Bound<T> named(String name) { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, - type, schema, validate); + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); } /** @@ -525,8 +528,7 @@ public class AvroIO { public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, - type, schema, validate); + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); } /** @@ -540,8 +542,7 @@ public class AvroIO { public Bound<T> withSuffix(String filenameSuffix) { validateOutputComponent(filenameSuffix); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, - schema, validate); + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); } /** @@ -559,32 +560,9 @@ public class AvroIO { * @see ShardNameTemplate */ public Bound<T> withNumShards(int numShards) { - return withNumShards(numShards, forceReshard); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that uses the provided shard count. - * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. If forceReshard is true, the output - * will be shuffled to obtain the desired sharding. If it is false, - * data will not be reshuffled, but parallelism of preceeding stages - * may be constrained. Setting this value is not recommended - * unless you require a specific number of output files. - * - * <p>Does not modify this object. - * - * @param numShards the number of shards to use, or 0 to let the system - * decide. - * @param forceReshard whether to force a reshard to obtain the desired sharding. - * @see ShardNameTemplate - */ - private Bound<T> withNumShards(int numShards, boolean forceReshard) { Preconditions.checkArgument(numShards >= 0); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, - type, schema, validate); + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); } /** @@ -597,8 +575,7 @@ public class AvroIO { */ public Bound<T> withShardNameTemplate(String shardTemplate) { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, - type, schema, validate); + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); } /** @@ -611,23 +588,7 @@ public class AvroIO { * <p>Does not modify this object. */ public Bound<T> withoutSharding() { - return withoutSharding(forceReshard); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that forces a single file as output. - * - * <p>This is a shortcut for - * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")} - * - * <p>Does not modify this object. - * - * @param forceReshard whether to force a reshard to obtain the desired sharding. - */ - private Bound<T> withoutSharding(boolean forceReshard) { - return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", forceReshard, - type, schema, validate); + return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", type, schema, validate); } /** @@ -640,8 +601,15 @@ public class AvroIO { * @param <X> the type of the elements of the input PCollection */ public <X> Bound<X> withSchema(Class<X> type) { - return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, - forceReshard, type, ReflectData.get().getSchema(type), validate); + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + type, + ReflectData.get().getSchema(type), + validate); } /** @@ -652,8 +620,15 @@ public class AvroIO { * <p>Does not modify this object. */ public Bound<GenericRecord> withSchema(Schema schema) { - return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, - forceReshard, GenericRecord.class, schema, validate); + return new Bound<>( + name, + filenamePrefix, + filenameSuffix, + numShards, + shardTemplate, + GenericRecord.class, + schema, + validate); } /** @@ -679,8 +654,7 @@ public class AvroIO { */ public Bound<T> withoutValidation() { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, - type, schema, false); + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, false); } @Override @@ -693,14 +667,14 @@ public class AvroIO { throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); } - if (numShards > 0 && forceReshard) { - // Reshard and re-apply a version of this write without resharding. - return input - .apply(new FileBasedSink.ReshardForWrite<T>()) - .apply(withNumShards(numShards, false)); - } else { - return PDone.in(input.getPipeline()); - } + // Note that custom sinks currently do not expose sharding controls. + // Thus pipeline runner writers need to individually add support internally to + // apply user requested sharding limits. + return input.apply( + "Write", + com.google.cloud.dataflow.sdk.io.Write.to( + new AvroSink<>( + filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)))); } /** @@ -742,21 +716,6 @@ public class AvroIO { public boolean needsValidation() { return validate; } - - static { - @SuppressWarnings("rawtypes") - DirectPipelineRunner.TransformEvaluator<Bound> transformEvaluator = - new DirectPipelineRunner.TransformEvaluator<Bound>() { - @Override - @SuppressWarnings("unchecked") - public void evaluate( - Bound transform, DirectPipelineRunner.EvaluationContext context) { - evaluateWriteHelper(transform, context); - } - }; - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, transformEvaluator); - } } /** Disallow construction of utility class. */ @@ -779,25 +738,73 @@ public class AvroIO { /** Disallow construction of utility class. */ private AvroIO() {} - private static <T> void evaluateWriteHelper( - Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext context) { - List<WindowedValue<T>> elems = - context.getPCollectionWindowedValues(context.getInput(transform)); - int numShards = transform.numShards; - if (numShards < 1) { - // System gets to choose. For direct mode, choose 1. - numShards = 1; + /** + * A {@link FileBasedSink} for Avro files. + */ + @VisibleForTesting + static class AvroSink<T> extends FileBasedSink<T> { + private final AvroCoder<T> coder; + + @VisibleForTesting + AvroSink( + String baseOutputFilename, String extension, String fileNameTemplate, AvroCoder<T> coder) { + super(baseOutputFilename, extension, fileNameTemplate); + this.coder = coder; + } + + @Override + public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { + return new AvroWriteOperation<>(this, coder); + } + + /** + * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation + * FileBasedWriteOperation} for Avro files. + */ + private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { + private final AvroCoder<T> coder; + + private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) { + super(sink); + this.coder = coder; + } + + @Override + public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { + return new AvroWriter<>(this, coder); + } } - AvroSink<T> writer = new AvroSink<>(transform.filenamePrefix, transform.shardTemplate, - transform.filenameSuffix, numShards, - WindowedValue.getValueOnlyCoder(AvroCoder.of(transform.type, transform.schema))); - try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) { - for (WindowedValue<T> elem : elems) { - sink.add(elem); - } - } catch (IOException exn) { - throw new RuntimeException( - "unable to write to output file \"" + transform.filenamePrefix + "\"", exn); + + /** + * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} + * for Avro files. + */ + private static class AvroWriter<T> extends FileBasedWriter<T> { + private final AvroCoder<T> coder; + private DataFileWriter<T> dataFileWriter; + + public AvroWriter(FileBasedWriteOperation<T> writeOperation, AvroCoder<T> coder) { + super(writeOperation); + this.mimeType = MimeTypes.BINARY; + this.coder = coder; + } + + @SuppressWarnings("deprecation") // uses internal test functionality. + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()); + dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); + } + + @Override + public void write(T value) throws Exception { + dataFileWriter.append(value); + } + + @Override + protected void writeFooter() throws Exception { + dataFileWriter.flush(); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index f20caa3..ac0dcea 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -32,6 +32,7 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; import com.google.cloud.dataflow.sdk.PipelineResult.State; import com.google.cloud.dataflow.sdk.annotations.Experimental; +import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -48,8 +49,10 @@ import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VarLongCoder; import com.google.cloud.dataflow.sdk.io.AvroIO; import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.io.FileBasedSink; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.io.Write; @@ -60,6 +63,8 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; import com.google.cloud.dataflow.sdk.options.StreamingOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; import com.google.cloud.dataflow.sdk.runners.dataflow.AssignWindows; import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; @@ -83,6 +88,7 @@ import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.WithKeys; import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; @@ -331,6 +337,8 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder(); builder.put(Read.Unbounded.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); + builder.put(Write.Bound.class, BatchWrite.class); + builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); if (options.getExperiments() == null || !options.getExperiments().contains("disable_ism_side_input")) { builder.put(View.AsMap.class, BatchViewAsMap.class); @@ -1919,6 +1927,187 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> } /** + * A {@link PTransform} that uses shuffle to create a fusion break. This allows pushing + * parallelism limits such as sharding controls further down the pipeline. + */ + private static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> { + @Override + public PCollection<T> apply(PCollection<T> input) { + return input + // TODO: This would need to be adapted to write per-window shards. + .apply( + Window.<T>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()) + .apply( + "RandomKey", + ParDo.of( + new DoFn<T, KV<Long, T>>() { + transient long counter, step; + + @Override + public void startBundle(Context c) { + counter = (long) (Math.random() * Long.MAX_VALUE); + step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE); + } + + @Override + public void processElement(ProcessContext c) { + counter += step; + c.output(KV.of(counter, c.element())); + } + })) + .apply(GroupByKey.<Long, T>create()) + .apply( + "Ungroup", + ParDo.of( + new DoFn<KV<Long, Iterable<T>>, T>() { + @Override + public void processElement(ProcessContext c) { + for (T item : c.element().getValue()) { + c.output(item); + } + } + })); + } + } + + /** + * Specialized implementation which overrides + * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} to provide Google + * Cloud Dataflow specific path validation of {@link FileBasedSink}s. + */ + private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> { + private final DataflowPipelineRunner runner; + private final Write.Bound<T> transform; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public BatchWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) { + this.runner = runner; + this.transform = transform; + } + + @Override + public PDone apply(PCollection<T> input) { + if (transform.getSink() instanceof FileBasedSink) { + FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink(); + PathValidator validator = runner.options.getPathValidator(); + validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename()); + } + return transform.apply(input); + } + } + + /** + * Specialized implementation which overrides + * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with + * a native sink instead of a custom sink as workaround until custom sinks + * have support for sharding controls. + */ + private static class BatchAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> { + private final AvroIO.Write.Bound<T> transform; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public BatchAvroIOWrite(DataflowPipelineRunner runner, AvroIO.Write.Bound<T> transform) { + this.transform = transform; + } + + @Override + public PDone apply(PCollection<T> input) { + if (transform.getNumShards() > 0) { + return input.apply(new ReshardForWrite<T>()).apply(new BatchAvroIONativeWrite<>(transform)); + } else { + return transform.apply(input); + } + } + } + + /** + * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way + * to provide the native definition of the Avro sink. + */ + private static class BatchAvroIONativeWrite<T> extends PTransform<PCollection<T>, PDone> { + private final AvroIO.Write.Bound<T> transform; + + public BatchAvroIONativeWrite(AvroIO.Write.Bound<T> transform) { + this.transform = transform; + } + + @Override + public PDone apply(PCollection<T> input) { + return PDone.in(input.getPipeline()); + } + + static { + DataflowPipelineTranslator.registerTransformTranslator( + BatchAvroIONativeWrite.class, new BatchAvroIONativeWriteTranslator()); + } + } + + /** + * AvroIO.Write.Bound support code for the Dataflow backend when applying parallelism limits + * through user requested sharding limits. + */ + private static class BatchAvroIONativeWriteTranslator + implements TransformTranslator<BatchAvroIONativeWrite<?>> { + @SuppressWarnings("unchecked") + @Override + public void translate( + @SuppressWarnings("rawtypes") BatchAvroIONativeWrite transform, + TranslationContext context) { + translateWriteHelper(transform, transform.transform, context); + } + + private <T> void translateWriteHelper( + BatchAvroIONativeWrite<T> transform, + AvroIO.Write.Bound<T> originalTransform, + TranslationContext context) { + // Note that the original transform can not be used during add step/add input + // and is only passed in to get properties from it. + + checkState( + originalTransform.getNumShards() > 0, + "Native AvroSink is expected to only be used when sharding controls are required."); + + context.addStep(transform, "ParallelWrite"); + context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + + // TODO: drop this check when server supports alternative templates. + switch (originalTransform.getShardTemplate()) { + case ShardNameTemplate.INDEX_OF_MAX: + break; // supported by server + case "": + // Empty shard template allowed - forces single output. + Preconditions.checkArgument( + originalTransform.getNumShards() <= 1, + "Num shards must be <= 1 when using an empty sharding template"); + break; + default: + throw new UnsupportedOperationException( + "Shard template " + + originalTransform.getShardTemplate() + + " not yet supported by Dataflow service"); + } + + context.addInput(PropertyNames.FORMAT, "avro"); + context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix()); + context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate()); + context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); + context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); + + context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); + + context.addEncodingInput( + WindowedValue.getValueOnlyCoder( + AvroCoder.of(originalTransform.getType(), originalTransform.getSchema()))); + } + } + + /** * Specialized (non-)implementation for * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} * for the Dataflow runner in streaming mode. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index f7217f7..885260e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -41,7 +41,6 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.IterableCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO; import com.google.cloud.dataflow.sdk.io.BigQueryIO; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.io.Read; @@ -49,7 +48,6 @@ import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.StreamingOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; -import com.google.cloud.dataflow.sdk.runners.dataflow.AvroIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator; @@ -1029,9 +1027,6 @@ public class DataflowPipelineTranslator { // IO Translation. registerTransformTranslator( - AvroIO.Write.Bound.class, new AvroIOTranslator.WriteTranslator()); - - registerTransformTranslator( BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator()); registerTransformTranslator( BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java deleted file mode 100644 index b114021..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; -import com.google.cloud.dataflow.sdk.util.PathValidator; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.common.base.Preconditions; - -/** - * Avro transform support code for the Dataflow backend. - */ -public class AvroIOTranslator { - - /** - * Implements AvroIO Write translation for the Dataflow backend. - */ - @SuppressWarnings("rawtypes") - public static class WriteTranslator implements TransformTranslator<AvroIO.Write.Bound> { - - @Override - public void translate( - AvroIO.Write.Bound transform, - TranslationContext context) { - translateWriteHelper(transform, context); - } - - private <T> void translateWriteHelper( - AvroIO.Write.Bound<T> transform, - TranslationContext context) { - PathValidator validator = context.getPipelineOptions().getPathValidator(); - String filenamePrefix = validator.validateOutputFilePrefixSupported( - transform.getFilenamePrefix()); - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - - // TODO: drop this check when server supports alternative templates. - switch (transform.getShardTemplate()) { - case ShardNameTemplate.INDEX_OF_MAX: - break; // supported by server - case "": - // Empty shard template allowed - forces single output. - Preconditions.checkArgument(transform.getNumShards() <= 1, - "Num shards must be <= 1 when using an empty sharding template"); - break; - default: - throw new UnsupportedOperationException("Shard template " - + transform.getShardTemplate() - + " not yet supported by Dataflow service"); - } - - context.addInput(PropertyNames.FORMAT, "avro"); - context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix); - context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, transform.getShardTemplate()); - context.addInput(PropertyNames.FILENAME_SUFFIX, transform.getFilenameSuffix()); - context.addInput(PropertyNames.VALIDATE_SINK, transform.needsValidation()); - - long numShards = transform.getNumShards(); - if (numShards > 0) { - context.addInput(PropertyNames.NUM_SHARDS, numShards); - } - - context.addEncodingInput( - WindowedValue.getValueOnlyCoder( - AvroCoder.of(transform.getType(), transform.getSchema()))); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java index 6bb459d..6a7679f 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java @@ -146,104 +146,110 @@ public class AvroIOGeneratedClassTest { @Test public void testReadFromGeneratedClass() throws Exception { - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class), - "AvroIO.Read/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class) - .from(avroFile.getPath()), - "AvroIO.Read/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.named("MyRead") - .from(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class), - "MyRead/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.named("MyRead") - .withSchema(AvroGeneratedUser.class) - .from(avroFile.getPath()), - "MyRead/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class) - .named("HerRead"), - "HerRead/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .named("HerRead") - .withSchema(AvroGeneratedUser.class), - "HerRead/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class) - .named("HerRead") - .from(avroFile.getPath()), - "HerRead/Read(AvroSource).out", generateAvroObjects()); - runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class) - .from(avroFile.getPath()) - .named("HerRead"), - "HerRead/Read(AvroSource).out", generateAvroObjects()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), + "AvroIO.Read/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), + "AvroIO.Read/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), + "MyRead/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), + "MyRead/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"), + "HerRead/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class), + "HerRead/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()), + "HerRead/Read.out", + generateAvroObjects()); + runTestRead( + AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"), + "HerRead/Read.out", + generateAvroObjects()); } @Test public void testReadFromSchema() throws Exception { - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .withSchema(schema), - "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.withSchema(schema) - .from(avroFile.getPath()), - "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.named("MyRead") - .from(avroFile.getPath()) - .withSchema(schema), - "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.named("MyRead") - .withSchema(schema) - .from(avroFile.getPath()), - "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .withSchema(schema) - .named("HerRead"), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .named("HerRead") - .withSchema(schema), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.withSchema(schema) - .named("HerRead") - .from(avroFile.getPath()), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.withSchema(schema) - .from(avroFile.getPath()) - .named("HerRead"), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), + "AvroIO.Read/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.withSchema(schema).from(avroFile.getPath()), + "AvroIO.Read/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema), + "MyRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()), + "MyRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"), + "HerRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema), + "HerRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()), + "HerRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"), + "HerRead/Read.out", + generateAvroGenericRecords()); } @Test public void testReadFromSchemaString() throws Exception { - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .withSchema(schemaString), - "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.withSchema(schemaString) - .from(avroFile.getPath()), - "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.named("MyRead") - .from(avroFile.getPath()) - .withSchema(schemaString), - "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.named("MyRead") - .withSchema(schemaString) - .from(avroFile.getPath()), - "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .withSchema(schemaString) - .named("HerRead"), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.from(avroFile.getPath()) - .named("HerRead") - .withSchema(schemaString), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.withSchema(schemaString) - .named("HerRead") - .from(avroFile.getPath()), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); - runTestRead(AvroIO.Read.withSchema(schemaString) - .from(avroFile.getPath()) - .named("HerRead"), - "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString), + "AvroIO.Read/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()), + "AvroIO.Read/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString), + "MyRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()), + "MyRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"), + "HerRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString), + "HerRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()), + "HerRead/Read.out", + generateAvroGenericRecords()); + runTestRead( + AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"), + "HerRead/Read.out", + generateAvroGenericRecords()); } <T> void runTestWrite(AvroIO.Write.Bound<T> write, String expectedName) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01a0da02/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java index 30c578b..2258a91 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java @@ -16,19 +16,25 @@ package com.google.cloud.dataflow.sdk.io; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.DefaultCoder; import com.google.cloud.dataflow.sdk.runners.DirectPipeline; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; import org.junit.Rule; @@ -38,6 +44,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -104,7 +111,7 @@ public class AvroIOTest { } @Test - public void testAvroIOWriteAndRead() throws Throwable { + public void testAvroIOWriteAndReadASingleFile() throws Throwable { DirectPipeline p = DirectPipeline.createForTest(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); @@ -189,6 +196,31 @@ public class AvroIOTest { p.run(); } + @SuppressWarnings("deprecation") // using AvroCoder#createDatumReader for tests. + @Test + public void testAvroSinkWrite() throws Exception { + String outputFilePrefix = new File(tmpFolder.getRoot(), "prefix").getAbsolutePath(); + String[] expectedElements = new String[] {"first", "second", "third"}; + + TestPipeline p = TestPipeline.create(); + p.apply(Create.<String>of(expectedElements)) + .apply(AvroIO.Write.to(outputFilePrefix).withSchema(String.class)); + p.run(); + + // Validate that the data written matches the expected elements in the expected order + String expectedName = + IOChannelUtils.constructName( + outputFilePrefix, ShardNameTemplate.INDEX_OF_MAX, "" /* no suffix */, 0, 1); + File outputFile = new File(expectedName); + assertTrue("Expected output file " + expectedName, outputFile.exists()); + try (DataFileReader<String> reader = + new DataFileReader<>(outputFile, AvroCoder.of(String.class).createDatumReader())) { + List<String> actualElements = new ArrayList<>(); + Iterators.addAll(actualElements, reader); + assertThat(actualElements, containsInAnyOrder(expectedElements)); + } + } + // TODO: for Write only, test withSuffix, withNumShards, // withShardNameTemplate and withoutSharding. }
