Revert "Migrate AvroIO.Write to a custom sink" ----Release Notes----
[] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115355272 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e89a4b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e89a4b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e89a4b3 Branch: refs/heads/master Commit: 2e89a4b3ddfd5d395aa8d6c4f73b501712b907a7 Parents: 635541a Author: sgmc <[email protected]> Authored: Tue Feb 23 10:21:54 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../google/cloud/dataflow/sdk/io/AvroIO.java | 188 ++++++++++--------- .../sdk/runners/DataflowPipelineRunner.java | 179 ------------------ .../sdk/runners/DataflowPipelineTranslator.java | 5 + .../sdk/runners/dataflow/AvroIOTranslator.java | 87 +++++++++ .../sdk/io/AvroIOGeneratedClassTest.java | 48 ++--- .../cloud/dataflow/sdk/io/AvroIOTest.java | 45 ----- 6 files changed, 215 insertions(+), 337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java index 7042010..9ee7e6b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java @@ -22,25 +22,24 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read.Bounded; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.runners.worker.AvroSink; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.MimeTypes; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.common.worker.Sink; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.ReflectData; import java.io.IOException; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; +import java.util.List; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -318,7 +317,7 @@ public class AvroIO { : com.google.cloud.dataflow.sdk.io.Read.from( AvroSource.from(filepattern).withSchema(type)); - PCollection<T> pcol = input.getPipeline().apply("Read", read); + PCollection<T> pcol = input.getPipeline().apply(read); // Honor the default output coder that would have been used by this PTransform. pcol.setCoder(getDefaultOutputCoder()); return pcol; @@ -474,6 +473,8 @@ public class AvroIO { final int numShards; /** Shard template string. */ final String shardTemplate; + /** Insert a shuffle before writing to decouple parallelism when numShards != 0. */ + final boolean forceReshard; /** The class type of the records. */ final Class<T> type; /** The schema of the output file. */ @@ -483,16 +484,18 @@ public class AvroIO { final boolean validate; Bound(Class<T> type) { - this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, type, null, true); + this(null, null, "", 0, ShardNameTemplate.INDEX_OF_MAX, true, type, null, true); } Bound(String name, String filenamePrefix, String filenameSuffix, int numShards, - String shardTemplate, Class<T> type, Schema schema, boolean validate) { + String shardTemplate, boolean forceReshard, Class<T> type, Schema schema, + boolean validate) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; this.numShards = numShards; this.shardTemplate = shardTemplate; + this.forceReshard = forceReshard; this.type = type; this.schema = schema; this.validate = validate; @@ -506,7 +509,7 @@ public class AvroIO { */ public Bound<T> named(String name) { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, schema, validate); } @@ -522,7 +525,7 @@ public class AvroIO { public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, schema, validate); } @@ -537,7 +540,7 @@ public class AvroIO { public Bound<T> withSuffix(String filenameSuffix) { validateOutputComponent(filenameSuffix); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, schema, validate); } @@ -556,9 +559,31 @@ public class AvroIO { * @see ShardNameTemplate */ public Bound<T> withNumShards(int numShards) { + return withNumShards(numShards, forceReshard); + } + + /** + * Returns a new {@link PTransform} that's like this one but + * that uses the provided shard count. + * + * <p>Constraining the number of shards is likely to reduce + * the performance of a pipeline. If forceReshard is true, the output + * will be shuffled to obtain the desired sharding. If it is false, + * data will not be reshuffled, but parallelism of preceeding stages + * may be constrained. Setting this value is not recommended + * unless you require a specific number of output files. + * + * <p>Does not modify this object. + * + * @param numShards the number of shards to use, or 0 to let the system + * decide. + * @param forceReshard whether to force a reshard to obtain the desired sharding. + * @see ShardNameTemplate + */ + private Bound<T> withNumShards(int numShards, boolean forceReshard) { Preconditions.checkArgument(numShards >= 0); return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, schema, validate); } @@ -572,7 +597,7 @@ public class AvroIO { */ public Bound<T> withShardNameTemplate(String shardTemplate) { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, schema, validate); } @@ -586,7 +611,22 @@ public class AvroIO { * <p>Does not modify this object. */ public Bound<T> withoutSharding() { - return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", + return withoutSharding(forceReshard); + } + + /** + * Returns a new {@link PTransform} that's like this one but + * that forces a single file as output. + * + * <p>This is a shortcut for + * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")} + * + * <p>Does not modify this object. + * + * @param forceReshard whether to force a reshard to obtain the desired sharding. + */ + private Bound<T> withoutSharding(boolean forceReshard) { + return new Bound<>(name, filenamePrefix, filenameSuffix, 1, "", forceReshard, type, schema, validate); } @@ -601,7 +641,7 @@ public class AvroIO { */ public <X> Bound<X> withSchema(Class<X> type) { return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, - type, ReflectData.get().getSchema(type), validate); + forceReshard, type, ReflectData.get().getSchema(type), validate); } /** @@ -613,7 +653,7 @@ public class AvroIO { */ public Bound<GenericRecord> withSchema(Schema schema) { return new Bound<>(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, - GenericRecord.class, schema, validate); + forceReshard, GenericRecord.class, schema, validate); } /** @@ -639,7 +679,7 @@ public class AvroIO { */ public Bound<T> withoutValidation() { return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, + name, filenamePrefix, filenameSuffix, numShards, shardTemplate, forceReshard, type, schema, false); } @@ -653,12 +693,14 @@ public class AvroIO { throw new IllegalStateException("need to set the schema of an AvroIO.Write transform"); } - // Note that custom sinks currently do not expose sharding controls. - // Thus pipeline runner writers need to individually add support internally to - // apply user requested sharding limits. - return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( - new AvroSink<>( - filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)))); + if (numShards > 0 && forceReshard) { + // Reshard and re-apply a version of this write without resharding. + return input + .apply(new FileBasedSink.ReshardForWrite<T>()) + .apply(withNumShards(numShards, false)); + } else { + return PDone.in(input.getPipeline()); + } } /** @@ -700,6 +742,21 @@ public class AvroIO { public boolean needsValidation() { return validate; } + + static { + @SuppressWarnings("rawtypes") + DirectPipelineRunner.TransformEvaluator<Bound> transformEvaluator = + new DirectPipelineRunner.TransformEvaluator<Bound>() { + @Override + @SuppressWarnings("unchecked") + public void evaluate( + Bound transform, DirectPipelineRunner.EvaluationContext context) { + evaluateWriteHelper(transform, context); + } + }; + DirectPipelineRunner.registerDefaultTransformEvaluator( + Bound.class, transformEvaluator); + } } /** Disallow construction of utility class. */ @@ -722,72 +779,25 @@ public class AvroIO { /** Disallow construction of utility class. */ private AvroIO() {} - /** - * A {@link FileBasedSink} for Avro files. - */ - @VisibleForTesting - static class AvroSink<T> extends FileBasedSink<T> { - private final AvroCoder<T> coder; - - @VisibleForTesting - AvroSink( - String baseOutputFilename, String extension, String fileNameTemplate, AvroCoder<T> coder) { - super(baseOutputFilename, extension, fileNameTemplate); - this.coder = coder; - } - - @Override - public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new AvroWriteOperation<>(this, coder); - } - - /** - * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation - * FileBasedWriteOperation} for Avro files. - */ - private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> { - private final AvroCoder<T> coder; - - private AvroWriteOperation(AvroSink<T> sink, AvroCoder<T> coder) { - super(sink); - this.coder = coder; - } - - @Override - public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new AvroWriter<>(this, coder); - } + private static <T> void evaluateWriteHelper( + Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext context) { + List<WindowedValue<T>> elems = + context.getPCollectionWindowedValues(context.getInput(transform)); + int numShards = transform.numShards; + if (numShards < 1) { + // System gets to choose. For direct mode, choose 1. + numShards = 1; } - - /** - * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} - * for Avro files. - */ - private static class AvroWriter<T> extends FileBasedWriter<T> { - private final AvroCoder<T> coder; - private DataFileWriter<T> dataFileWriter; - - public AvroWriter(FileBasedWriteOperation<T> writeOperation, AvroCoder<T> coder) { - super(writeOperation); - this.mimeType = MimeTypes.BINARY; - this.coder = coder; - } - - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - dataFileWriter = new DataFileWriter<>(coder.createDatumWriter()); - dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel)); - } - - @Override - public void write(T value) throws Exception { - dataFileWriter.append(value); - } - - @Override - protected void writeFooter() throws Exception { - dataFileWriter.close(); - } + AvroSink<T> writer = new AvroSink<>(transform.filenamePrefix, transform.shardTemplate, + transform.filenameSuffix, numShards, + WindowedValue.getValueOnlyCoder(AvroCoder.of(transform.type, transform.schema))); + try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) { + for (WindowedValue<T> elem : elems) { + sink.add(elem); + } + } catch (IOException exn) { + throw new RuntimeException( + "unable to write to output file \"" + transform.filenamePrefix + "\"", exn); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 5a57f7f..f20caa3 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -32,7 +32,6 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; import com.google.cloud.dataflow.sdk.PipelineResult.State; import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -49,10 +48,8 @@ import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VarLongCoder; import com.google.cloud.dataflow.sdk.io.AvroIO; import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.FileBasedSink; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.io.Write; @@ -63,8 +60,6 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; import com.google.cloud.dataflow.sdk.options.StreamingOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.JobSpecification; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; import com.google.cloud.dataflow.sdk.runners.dataflow.AssignWindows; import com.google.cloud.dataflow.sdk.runners.dataflow.DataflowAggregatorTransforms; import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; @@ -88,7 +83,6 @@ import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.WithKeys; import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; @@ -337,8 +331,6 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.<Class<?>, Class<?>>builder(); builder.put(Read.Unbounded.class, UnsupportedIO.class); builder.put(Window.Bound.class, AssignWindows.class); - builder.put(Write.Bound.class, BatchWrite.class); - builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); if (options.getExperiments() == null || !options.getExperiments().contains("disable_ism_side_input")) { builder.put(View.AsMap.class, BatchViewAsMap.class); @@ -1927,177 +1919,6 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> } /** - * A {@link PTransform} that uses shuffle to create a fusion break. This allows pushing - * parallelism limits such as sharding controls further down the pipeline. - */ - private static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> { - @Override - public PCollection<T> apply(PCollection<T> input) { - return input - // TODO: This would need to be adapted to write per-window shards. - .apply(Window.<T>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()) - .apply("RandomKey", ParDo.of( - new DoFn<T, KV<Long, T>>() { - transient long counter, step; - @Override - public void startBundle(Context c) { - counter = (long) (Math.random() * Long.MAX_VALUE); - step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE); - } - @Override - public void processElement(ProcessContext c) { - counter += step; - c.output(KV.of(counter, c.element())); - } - })) - .apply(GroupByKey.<Long, T>create()) - .apply("Ungroup", ParDo.of( - new DoFn<KV<Long, Iterable<T>>, T>() { - @Override - public void processElement(ProcessContext c) { - for (T item : c.element().getValue()) { - c.output(item); - } - } - })); - } - } - - /** - * Specialized implementation which overrides - * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} to provide Google - * Cloud Dataflow specific path validation of {@link FileBasedSink}s. - */ - private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> { - private final DataflowPipelineRunner runner; - private final Write.Bound<T> transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() - public BatchWrite(DataflowPipelineRunner runner, Write.Bound<T> transform) { - this.runner = runner; - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - if (transform.getSink() instanceof FileBasedSink) { - FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink(); - PathValidator validator = runner.options.getPathValidator(); - validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename()); - } - return transform.apply(input); - } - } - - /** - * Specialized implementation which overrides - * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with - * a native sink instead of a custom sink as workaround until custom sinks - * have support for sharding controls. - */ - private static class BatchAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> { - private final AvroIO.Write.Bound<T> transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() - public BatchAvroIOWrite(DataflowPipelineRunner runner, AvroIO.Write.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - if (transform.getNumShards() > 0) { - return input - .apply(new ReshardForWrite<T>()) - .apply(new BatchAvroIONativeWrite<>(transform)); - } else { - return transform.apply(input); - } - } - } - - /** - * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way - * to provide the native definition of the Avro sink. - */ - private static class BatchAvroIONativeWrite<T> extends PTransform<PCollection<T>, PDone> { - private final AvroIO.Write.Bound<T> transform; - public BatchAvroIONativeWrite(AvroIO.Write.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - return PDone.in(input.getPipeline()); - } - - static { - DataflowPipelineTranslator.registerTransformTranslator( - BatchAvroIONativeWrite.class, new BatchAvroIONativeWriteTranslator()); - } - } - - /** - * AvroIO.Write.Bound support code for the Dataflow backend when applying parallelism limits - * through user requested sharding limits. - */ - private static class BatchAvroIONativeWriteTranslator - implements TransformTranslator<BatchAvroIONativeWrite<?>> { - @SuppressWarnings("unchecked") - @Override - public void translate(@SuppressWarnings("rawtypes") BatchAvroIONativeWrite transform, - TranslationContext context) { - translateWriteHelper(transform, transform.transform, context); - } - - private <T> void translateWriteHelper( - BatchAvroIONativeWrite<T> transform, - AvroIO.Write.Bound<T> originalTransform, - TranslationContext context) { - // Note that the original transform can not be used during add step/add input - // and is only passed in to get properties from it. - - checkState(originalTransform.getNumShards() > 0, - "Native AvroSink is expected to only be used when sharding controls are required."); - - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - - // TODO: drop this check when server supports alternative templates. - switch (originalTransform.getShardTemplate()) { - case ShardNameTemplate.INDEX_OF_MAX: - break; // supported by server - case "": - // Empty shard template allowed - forces single output. - Preconditions.checkArgument(originalTransform.getNumShards() <= 1, - "Num shards must be <= 1 when using an empty sharding template"); - break; - default: - throw new UnsupportedOperationException("Shard template " - + originalTransform.getShardTemplate() - + " not yet supported by Dataflow service"); - } - - context.addInput(PropertyNames.FORMAT, "avro"); - context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix()); - context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate()); - context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); - context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); - - context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); - - context.addEncodingInput( - WindowedValue.getValueOnlyCoder( - AvroCoder.of(originalTransform.getType(), originalTransform.getSchema()))); - } - } - - /** * Specialized (non-)implementation for * {@link com.google.cloud.dataflow.sdk.io.Write.Bound Write.Bound} * for the Dataflow runner in streaming mode. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 885260e..f7217f7 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.IterableCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO; import com.google.cloud.dataflow.sdk.io.BigQueryIO; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.io.Read; @@ -48,6 +49,7 @@ import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.StreamingOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; +import com.google.cloud.dataflow.sdk.runners.dataflow.AvroIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator; @@ -1027,6 +1029,9 @@ public class DataflowPipelineTranslator { // IO Translation. registerTransformTranslator( + AvroIO.Write.Bound.class, new AvroIOTranslator.WriteTranslator()); + + registerTransformTranslator( BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator()); registerTransformTranslator( BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java new file mode 100644 index 0000000..b114021 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/AvroIOTranslator.java @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.runners.dataflow; + +import com.google.cloud.dataflow.sdk.coders.AvroCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO; +import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; +import com.google.cloud.dataflow.sdk.util.PathValidator; +import com.google.cloud.dataflow.sdk.util.PropertyNames; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.common.base.Preconditions; + +/** + * Avro transform support code for the Dataflow backend. + */ +public class AvroIOTranslator { + + /** + * Implements AvroIO Write translation for the Dataflow backend. + */ + @SuppressWarnings("rawtypes") + public static class WriteTranslator implements TransformTranslator<AvroIO.Write.Bound> { + + @Override + public void translate( + AvroIO.Write.Bound transform, + TranslationContext context) { + translateWriteHelper(transform, context); + } + + private <T> void translateWriteHelper( + AvroIO.Write.Bound<T> transform, + TranslationContext context) { + PathValidator validator = context.getPipelineOptions().getPathValidator(); + String filenamePrefix = validator.validateOutputFilePrefixSupported( + transform.getFilenamePrefix()); + context.addStep(transform, "ParallelWrite"); + context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + + // TODO: drop this check when server supports alternative templates. + switch (transform.getShardTemplate()) { + case ShardNameTemplate.INDEX_OF_MAX: + break; // supported by server + case "": + // Empty shard template allowed - forces single output. + Preconditions.checkArgument(transform.getNumShards() <= 1, + "Num shards must be <= 1 when using an empty sharding template"); + break; + default: + throw new UnsupportedOperationException("Shard template " + + transform.getShardTemplate() + + " not yet supported by Dataflow service"); + } + + context.addInput(PropertyNames.FORMAT, "avro"); + context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix); + context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, transform.getShardTemplate()); + context.addInput(PropertyNames.FILENAME_SUFFIX, transform.getFilenameSuffix()); + context.addInput(PropertyNames.VALIDATE_SINK, transform.needsValidation()); + + long numShards = transform.getNumShards(); + if (numShards > 0) { + context.addInput(PropertyNames.NUM_SHARDS, numShards); + } + + context.addEncodingInput( + WindowedValue.getValueOnlyCoder( + AvroCoder.of(transform.getType(), transform.getSchema()))); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java index 927d3a3..6bb459d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOGeneratedClassTest.java @@ -148,102 +148,102 @@ public class AvroIOGeneratedClassTest { public void testReadFromGeneratedClass() throws Exception { runTestRead(AvroIO.Read.from(avroFile.getPath()) .withSchema(AvroGeneratedUser.class), - "AvroIO.Read/Read.out", generateAvroObjects()); + "AvroIO.Read/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class) .from(avroFile.getPath()), - "AvroIO.Read/Read.out", generateAvroObjects()); + "AvroIO.Read/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.named("MyRead") .from(avroFile.getPath()) .withSchema(AvroGeneratedUser.class), - "MyRead/Read.out", generateAvroObjects()); + "MyRead/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.named("MyRead") .withSchema(AvroGeneratedUser.class) .from(avroFile.getPath()), - "MyRead/Read.out", generateAvroObjects()); + "MyRead/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.from(avroFile.getPath()) .withSchema(AvroGeneratedUser.class) .named("HerRead"), - "HerRead/Read.out", generateAvroObjects()); + "HerRead/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.from(avroFile.getPath()) .named("HerRead") .withSchema(AvroGeneratedUser.class), - "HerRead/Read.out", generateAvroObjects()); + "HerRead/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class) .named("HerRead") .from(avroFile.getPath()), - "HerRead/Read.out", generateAvroObjects()); + "HerRead/Read(AvroSource).out", generateAvroObjects()); runTestRead(AvroIO.Read.withSchema(AvroGeneratedUser.class) .from(avroFile.getPath()) .named("HerRead"), - "HerRead/Read.out", generateAvroObjects()); + "HerRead/Read(AvroSource).out", generateAvroObjects()); } @Test public void testReadFromSchema() throws Exception { runTestRead(AvroIO.Read.from(avroFile.getPath()) .withSchema(schema), - "AvroIO.Read/Read.out", generateAvroGenericRecords()); + "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.withSchema(schema) .from(avroFile.getPath()), - "AvroIO.Read/Read.out", generateAvroGenericRecords()); + "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.named("MyRead") .from(avroFile.getPath()) .withSchema(schema), - "MyRead/Read.out", generateAvroGenericRecords()); + "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.named("MyRead") .withSchema(schema) .from(avroFile.getPath()), - "MyRead/Read.out", generateAvroGenericRecords()); + "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.from(avroFile.getPath()) .withSchema(schema) .named("HerRead"), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.from(avroFile.getPath()) .named("HerRead") .withSchema(schema), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.withSchema(schema) .named("HerRead") .from(avroFile.getPath()), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.withSchema(schema) .from(avroFile.getPath()) .named("HerRead"), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); } @Test public void testReadFromSchemaString() throws Exception { runTestRead(AvroIO.Read.from(avroFile.getPath()) .withSchema(schemaString), - "AvroIO.Read/Read.out", generateAvroGenericRecords()); + "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.withSchema(schemaString) .from(avroFile.getPath()), - "AvroIO.Read/Read.out", generateAvroGenericRecords()); + "AvroIO.Read/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.named("MyRead") .from(avroFile.getPath()) .withSchema(schemaString), - "MyRead/Read.out", generateAvroGenericRecords()); + "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.named("MyRead") .withSchema(schemaString) .from(avroFile.getPath()), - "MyRead/Read.out", generateAvroGenericRecords()); + "MyRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.from(avroFile.getPath()) .withSchema(schemaString) .named("HerRead"), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.from(avroFile.getPath()) .named("HerRead") .withSchema(schemaString), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.withSchema(schemaString) .named("HerRead") .from(avroFile.getPath()), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); runTestRead(AvroIO.Read.withSchema(schemaString) .from(avroFile.getPath()) .named("HerRead"), - "HerRead/Read.out", generateAvroGenericRecords()); + "HerRead/Read(AvroSource).out", generateAvroGenericRecords()); } <T> void runTestWrite(AvroIO.Write.Bound<T> write, String expectedName) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e89a4b3/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java index 207b2ba..30c578b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java @@ -16,28 +16,19 @@ package com.google.cloud.dataflow.sdk.io; -import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.coders.DefaultCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO.AvroSink; -import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation; -import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.DirectPipeline; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; -import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; import org.junit.Rule; @@ -47,10 +38,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.File; -import java.nio.channels.FileChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.file.StandardOpenOption; -import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -202,38 +189,6 @@ public class AvroIOTest { p.run(); } - @Test - public void testAvroSinkWrite() throws Exception { - String[] expectedElements = new String[]{ "first", "second", "third" }; - PipelineOptions options = PipelineOptionsFactory.create(); - AvroCoder<String> coder = AvroCoder.of(String.class); - File tmpFile = tmpFolder.newFile(); - AvroSink<String> avroSink = new AvroSink<>( - "prefix", "suffix", ShardNameTemplate.INDEX_OF_MAX, coder); - FileBasedWriteOperation<String> writeOperation = avroSink.createWriteOperation(options); - FileBasedWriter<String> writer = writeOperation.createWriter(options); - - WritableByteChannel channel = FileChannel.open(tmpFile.toPath(), StandardOpenOption.WRITE); - writer.prepareWrite(channel); - writer.writeHeader(); - for (String element : expectedElements) { - writer.write(element); - // We expect the channel to remain open - assertTrue(channel.isOpen()); - } - - writer.close(); - // Ensure that we properly close the channel - assertFalse(channel.isOpen()); - - // Validate that the data written matches the expected elements in the expected order - try (DataFileReader<String> reader = new DataFileReader<>(tmpFile, coder.createDatumReader())) { - List<String> actualElements = new ArrayList<>(); - Iterators.addAll(actualElements, reader); - assertThat(actualElements, contains(expectedElements)); - } - } - // TODO: for Write only, test withSuffix, withNumShards, // withShardNameTemplate and withoutSharding. }
