Migrate TextIO.Write to a custom sink Note for user requested sharding limits to be supported, each pipeline runner must support applying those sharding limits.
Google Cloud Dataflow supports sharding limits. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115310814 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7b5189c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7b5189c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7b5189c Branch: refs/heads/master Commit: d7b5189c5708b48308060dd40d6f3ab073759d28 Parents: 6b372ec Author: lcwik <[email protected]> Authored: Mon Feb 22 23:59:46 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../google/cloud/dataflow/sdk/io/TextIO.java | 187 +++++++++---------- .../sdk/runners/DataflowPipelineRunner.java | 108 ++++++++++- .../sdk/runners/DataflowPipelineTranslator.java | 5 - .../sdk/runners/dataflow/TextIOTranslator.java | 91 --------- .../cloud/dataflow/sdk/io/TextIOTest.java | 22 --- .../sdk/runners/DataflowPipelineRunnerTest.java | 21 +-- .../runners/DataflowPipelineTranslatorTest.java | 4 +- .../dataflow/sdk/runners/TransformTreeTest.java | 9 +- 8 files changed, 209 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index 0bb2861..d342f25 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -26,11 +26,9 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.worker.TextSink; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.common.worker.Sink; +import com.google.cloud.dataflow.sdk.util.MimeTypes; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; @@ -39,10 +37,13 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import java.util.List; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.NoSuchElementException; import java.util.regex.Pattern; @@ -66,7 +67,7 @@ import javax.annotation.Nullable; * * <p>See the following examples: * - * <pre> {@code + * <pre>{@code * Pipeline p = ...; * * // A simple Read of a local file (only runs locally): @@ -79,7 +80,7 @@ import javax.annotation.Nullable; * p.apply(TextIO.Read.named("ReadNumbers") * .from("gs://my_bucket/path/to/numbers-*.txt") * .withCoder(TextualIntegerCoder.of())); - * } </pre> + * }</pre> * * <p>To write a {@link PCollection} to one or more text files, use * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify @@ -94,7 +95,7 @@ import javax.annotation.Nullable; * will be overwritten. * * <p>For example: - * <pre> {@code + * <pre>{@code * // A simple Write to a local file (only runs locally): * PCollection<String> lines = ...; * lines.apply(TextIO.Write.to("/path/to/file.txt")); @@ -106,7 +107,7 @@ import javax.annotation.Nullable; * .to("gs://my_bucket/path/to/numbers") * .withSuffix(".txt") * .withCoder(TextualIntegerCoder.of())); - * } </pre> + * }</pre> * * <h3>Permissions</h3> * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files @@ -477,9 +478,6 @@ public class TextIO { /** Requested number of shards. 0 for automatic. */ private final int numShards; - /** Insert a shuffle before writing to decouple parallelism when numShards != 0. */ - private final boolean forceReshard; - /** The shard template of each file written, combined with prefix and suffix. */ private final String shardTemplate; @@ -487,17 +485,16 @@ public class TextIO { private final boolean validate; Bound(Coder<T> coder) { - this(null, null, "", coder, 0, true, ShardNameTemplate.INDEX_OF_MAX, true); + this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true); } private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder, - int numShards, boolean forceReshard, String shardTemplate, boolean validate) { + int numShards, String shardTemplate, boolean validate) { super(name); this.coder = coder; this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; this.numShards = numShards; - this.forceReshard = forceReshard; this.shardTemplate = shardTemplate; this.validate = validate; } @@ -510,7 +507,7 @@ public class TextIO { */ public Bound<T> named(String name) { return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, - forceReshard, shardTemplate, validate); + shardTemplate, validate); } /** @@ -523,7 +520,7 @@ public class TextIO { */ public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, shardTemplate, validate); } @@ -537,7 +534,7 @@ public class TextIO { */ public Bound<T> withSuffix(String nameExtension) { validateOutputComponent(nameExtension); - return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards, forceReshard, + return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards, shardTemplate, validate); } @@ -556,30 +553,8 @@ public class TextIO { * @see ShardNameTemplate */ public Bound<T> withNumShards(int numShards) { - return withNumShards(numShards, forceReshard); - } - - /** - * Returns a transform for writing to text files that's like this one but - * that uses the provided shard count. - * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. If forceReshard is true, the output - * will be shuffled to obtain the desired sharding. If it is false, - * data will not be reshuffled, but parallelism of preceeding stages - * may be constrained. Setting this value is not recommended - * unless you require a specific number of output files. - * - * <p>Does not modify this object. - * - * @param numShards the number of shards to use, or 0 to let the system - * decide. - * @param forceReshard whether to force a reshard to obtain the desired sharding. - * @see ShardNameTemplate - */ - private Bound<T> withNumShards(int numShards, boolean forceReshard) { Preconditions.checkArgument(numShards >= 0); - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, shardTemplate, validate); } @@ -592,7 +567,7 @@ public class TextIO { * @see ShardNameTemplate */ public Bound<T> withShardNameTemplate(String shardTemplate) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, shardTemplate, validate); } @@ -610,25 +585,7 @@ public class TextIO { * <p>Does not modify this object. */ public Bound<T> withoutSharding() { - return withoutSharding(forceReshard); - } - - /** - * Returns a transform for writing to text files that's like this one but - * that forces a single file as output. - * - * <p>Constraining the number of shards is likely to reduce - * the performance of a pipeline. Using this setting is not recommended - * unless you truly require a single output file. - * - * <p>This is a shortcut for - * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")} - * - * <p>Does not modify this object. - */ - private Bound<T> withoutSharding(boolean forceReshard) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, forceReshard, "", - validate); + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate); } /** @@ -640,7 +597,7 @@ public class TextIO { * @param <X> the type of the elements of the input {@link PCollection} */ public <X> Bound<X> withCoder(Coder<X> coder) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, shardTemplate, validate); } @@ -655,7 +612,7 @@ public class TextIO { * <p>Does not modify this object. */ public Bound<T> withoutValidation() { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, shardTemplate, false); } @@ -665,14 +622,13 @@ public class TextIO { throw new IllegalStateException( "need to set the filename prefix of a TextIO.Write transform"); } - if (numShards > 0 && forceReshard) { - // Reshard and re-apply a version of this write without resharding. - return input - .apply(new FileBasedSink.ReshardForWrite<T>()) - .apply(withNumShards(numShards, false)); - } else { - return PDone.in(input.getPipeline()); - } + + // Note that custom sinks currently do not expose sharding controls. + // Thus pipeline runner writers need to individually add support internally to + // apply user requested sharding limits. + return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( + new TextSink<>( + filenamePrefix, filenameSuffix, shardTemplate, coder))); } /** @@ -710,17 +666,6 @@ public class TextIO { public boolean needsValidation() { return validate; } - - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() { - @Override - public void evaluate( - Bound transform, DirectPipelineRunner.EvaluationContext context) { - evaluateWriteHelper(transform, context); - } - }); - } } } @@ -978,24 +923,70 @@ public class TextIO { } } - private static <T> void evaluateWriteHelper( - Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext context) { - List<T> elems = context.getPCollection(context.getInput(transform)); - int numShards = transform.numShards; - if (numShards < 1) { - // System gets to choose. For direct mode, choose 1. - numShards = 1; + /** + * A {@link FileBasedSink} for text files. Produces text files with the new line separator + * {@code '\n'} represented in {@code UTF-8} format as the record separator. + * Each record (including the last) is terminated. + */ + @VisibleForTesting + static class TextSink<T> extends FileBasedSink<T> { + private final Coder<T> coder; + + @VisibleForTesting + TextSink( + String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) { + super(baseOutputFilename, extension, fileNameTemplate); + this.coder = coder; } - TextSink<WindowedValue<T>> writer = TextSink.createForDirectPipelineRunner( - transform.filenamePrefix, transform.getShardNameTemplate(), transform.filenameSuffix, - numShards, true, null, null, transform.coder); - try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) { - for (T elem : elems) { - sink.add(WindowedValue.valueInGlobalWindow(elem)); - } - } catch (IOException exn) { - throw new RuntimeException( - "unable to write to output file \"" + transform.filenamePrefix + "\"", exn); + + @Override + public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { + return new TextWriteOperation<>(this, coder); + } + + /** + * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation + * FileBasedWriteOperation} for text files. + */ + private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> { + private final Coder<T> coder; + + private TextWriteOperation(TextSink<T> sink, Coder<T> coder) { + super(sink); + this.coder = coder; + } + + @Override + public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { + return new TextWriter<>(this, coder); + } + } + + /** + * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} + * for text files. + */ + private static class TextWriter<T> extends FileBasedWriter<T> { + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final Coder<T> coder; + private OutputStream out; + + public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { + super(writeOperation); + this.mimeType = MimeTypes.TEXT; + this.coder = coder; + } + + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + out = Channels.newOutputStream(channel); + } + + @Override + public void write(T value) throws Exception { + coder.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 5a57f7f..396d308 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -339,6 +339,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> builder.put(Window.Bound.class, AssignWindows.class); builder.put(Write.Bound.class, BatchWrite.class); builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); + builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class); if (options.getExperiments() == null || !options.getExperiments().contains("disable_ism_side_input")) { builder.put(View.AsMap.class, BatchViewAsMap.class); @@ -1995,6 +1996,111 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> /** * Specialized implementation which overrides + * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with + * a native sink instead of a custom sink as workaround until custom sinks + * have support for sharding controls. + */ + private static class BatchTextIOWrite<T> extends PTransform<PCollection<T>, PDone> { + private final TextIO.Write.Bound<T> transform; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public BatchTextIOWrite(DataflowPipelineRunner runner, TextIO.Write.Bound<T> transform) { + this.transform = transform; + } + + @Override + public PDone apply(PCollection<T> input) { + if (transform.getNumShards() > 0) { + return input + .apply(new ReshardForWrite<T>()) + .apply(new BatchTextIONativeWrite<>(transform)); + } else { + return transform.apply(input); + } + } + } + + /** + * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way + * to provide the native definition of the Text sink. + */ + private static class BatchTextIONativeWrite<T> extends PTransform<PCollection<T>, PDone> { + private final TextIO.Write.Bound<T> transform; + public BatchTextIONativeWrite(TextIO.Write.Bound<T> transform) { + this.transform = transform; + } + + @Override + public PDone apply(PCollection<T> input) { + return PDone.in(input.getPipeline()); + } + + static { + DataflowPipelineTranslator.registerTransformTranslator( + BatchTextIONativeWrite.class, new BatchTextIONativeWriteTranslator()); + } + } + + /** + * TextIO.Write.Bound support code for the Dataflow backend when applying parallelism limits + * through user requested sharding limits. + */ + private static class BatchTextIONativeWriteTranslator + implements TransformTranslator<BatchTextIONativeWrite<?>> { + @SuppressWarnings("unchecked") + @Override + public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite transform, + TranslationContext context) { + translateWriteHelper(transform, transform.transform, context); + } + + private <T> void translateWriteHelper( + BatchTextIONativeWrite<T> transform, + TextIO.Write.Bound<T> originalTransform, + TranslationContext context) { + // Note that the original transform can not be used during add step/add input + // and is only passed in to get properties from it. + + checkState(originalTransform.getNumShards() > 0, + "Native TextSink is expected to only be used when sharding controls are required."); + + context.addStep(transform, "ParallelWrite"); + context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + + // TODO: drop this check when server supports alternative templates. + switch (originalTransform.getShardTemplate()) { + case ShardNameTemplate.INDEX_OF_MAX: + break; // supported by server + case "": + // Empty shard template allowed - forces single output. + Preconditions.checkArgument(originalTransform.getNumShards() <= 1, + "Num shards must be <= 1 when using an empty sharding template"); + break; + default: + throw new UnsupportedOperationException("Shard template " + + originalTransform.getShardTemplate() + + " not yet supported by Dataflow service"); + } + + // TODO: How do we want to specify format and + // format-specific properties? + context.addInput(PropertyNames.FORMAT, "text"); + context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix()); + context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, + originalTransform.getShardNameTemplate()); + context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); + context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); + context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); + context.addEncodingInput( + WindowedValue.getValueOnlyCoder(originalTransform.getCoder())); + + } + } + + /** + * Specialized implementation which overrides * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with * a native sink instead of a custom sink as workaround until custom sinks * have support for sharding controls. @@ -2088,9 +2194,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate()); context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); - context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); - context.addEncodingInput( WindowedValue.getValueOnlyCoder( AvroCoder.of(originalTransform.getType(), originalTransform.getSchema()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 885260e..22ec3bb 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -44,14 +44,12 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.io.BigQueryIO; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.StreamingOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator; -import com.google.cloud.dataflow.sdk.runners.dataflow.TextIOTranslator; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -1037,9 +1035,6 @@ public class DataflowPipelineTranslator { DataflowPipelineRunner.StreamingPubsubIOWrite.class, new PubsubIOTranslator.WriteTranslator()); - registerTransformTranslator( - TextIO.Write.Bound.class, new TextIOTranslator.WriteTranslator()); - registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java deleted file mode 100644 index d6c96c3..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.runners.dataflow; - -import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; -import com.google.cloud.dataflow.sdk.util.PathValidator; -import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.common.base.Preconditions; - -/** - * TextIO transform support code for the Dataflow backend. - */ -public class TextIOTranslator { - /** - * Implements TextIO Write translation for the Dataflow backend. - */ - @SuppressWarnings({"rawtypes", "unchecked"}) - public static class WriteTranslator implements TransformTranslator<TextIO.Write.Bound> { - @Override - public void translate( - TextIO.Write.Bound transform, - TranslationContext context) { - translateWriteHelper(transform, context); - } - - private <T> void translateWriteHelper( - TextIO.Write.Bound<T> transform, - TranslationContext context) { - if (context.getPipelineOptions().isStreaming()) { - throw new IllegalArgumentException("TextIO not supported in streaming mode."); - } - - PathValidator validator = context.getPipelineOptions().getPathValidator(); - String filenamePrefix = validator.validateOutputFilePrefixSupported( - transform.getFilenamePrefix()); - - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - - // TODO: drop this check when server supports alternative templates. - switch (transform.getShardTemplate()) { - case ShardNameTemplate.INDEX_OF_MAX: - break; // supported by server - case "": - // Empty shard template allowed - forces single output. - Preconditions.checkArgument(transform.getNumShards() <= 1, - "Num shards must be <= 1 when using an empty sharding template"); - break; - default: - throw new UnsupportedOperationException("Shard template " - + transform.getShardTemplate() - + " not yet supported by Dataflow service"); - } - - // TODO: How do we want to specify format and - // format-specific properties? - context.addInput(PropertyNames.FORMAT, "text"); - context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix); - context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, - transform.getShardNameTemplate()); - context.addInput(PropertyNames.FILENAME_SUFFIX, transform.getFilenameSuffix()); - context.addInput(PropertyNames.VALIDATE_SINK, transform.needsValidation()); - - long numShards = transform.getNumShards(); - if (numShards > 0) { - context.addInput(PropertyNames.NUM_SHARDS, numShards); - } - - context.addEncodingInput( - WindowedValue.getValueOnlyCoder(transform.getCoder())); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 6ad81e4..0a8e381 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -264,28 +264,6 @@ public class TextIOTest { } @Test - public void testWriteSharded() throws IOException { - File outFolder = tmpFolder.newFolder(); - String filename = outFolder.toPath().resolve("output").toString(); - - Pipeline p = TestPipeline.create(); - - PCollection<String> input = - p.apply(Create.of(Arrays.asList(LINES_ARRAY)) - .withCoder(StringUtf8Coder.of())); - - input.apply(TextIO.Write.to(filename).withNumShards(2).withSuffix(".txt")); - - p.run(); - - String[] files = outFolder.list(); - - assertThat(Arrays.asList(files), - containsInAnyOrder("output-00000-of-00002.txt", - "output-00001-of-00002.txt")); - } - - @Test public void testWriteNamed() { { PTransform<PCollection<String>, PDone> transform1 = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index c7175cb..c5f2d3f 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -453,16 +453,12 @@ public class DataflowPipelineRunnerTest { @Test public void testNonGcsFilePathInWriteFailure() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); + PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); - p.run(); - assertValidJob(jobCaptor.getValue()); + pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); } @Test @@ -482,17 +478,12 @@ public class DataflowPipelineRunnerTest { @Test public void testMultiSlashGcsFileWritePath() throws IOException { - ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteInvalidGcsFile") - .to("gs://bucket/tmp//file")); + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); + PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("consecutive slashes"); - p.run(); - assertValidJob(jobCaptor.getValue()); + pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file")); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index b9c94ad..72090a0 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -403,7 +403,7 @@ public class DataflowPipelineTranslatorTest { pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) .apply(ParDo.of(new NoOpFn())) .apply(new EmbeddedTransform(predefinedStep.clone())) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out")); + .apply(ParDo.of(new NoOpFn())); Job job = translator.translate( pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); @@ -456,7 +456,7 @@ public class DataflowPipelineTranslatorTest { Job job = translator.translate( pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); - assertEquals(3, job.getSteps().size()); + assertEquals(13, job.getSteps().size()); Step step = job.getSteps().get(1); assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME)); return step; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7b5189c/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java index f1b7cd7..68e1db1 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.io.Write; import com.google.cloud.dataflow.sdk.transforms.Count; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; @@ -133,9 +134,12 @@ public class TransformTreeTest { assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); assertNotNull(node.getEnclosingNode()); assertTrue(node.isCompositeNode()); + } else if (transform instanceof Write.Bound) { + assertTrue(visited.add(TransformsSeen.WRITE)); + assertNotNull(node.getEnclosingNode()); + assertTrue(node.isCompositeNode()); } assertThat(transform, not(instanceOf(Read.Bounded.class))); - assertThat(transform, not(instanceOf(TextIO.Write.Bound.class))); } @Override @@ -151,10 +155,9 @@ public class TransformTreeTest { PTransform<?, ?> transform = node.getTransform(); // Pick is a composite, should not be visited here. assertThat(transform, not(instanceOf(Sample.SampleAny.class))); + assertThat(transform, not(instanceOf(Write.Bound.class))); if (transform instanceof Read.Bounded) { assertTrue(visited.add(TransformsSeen.READ)); - } else if (transform instanceof TextIO.Write.Bound) { - assertTrue(visited.add(TransformsSeen.WRITE)); } }
