Revert "Migrate TextIO.Write to a custom sink" ----Release Notes----
[] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115351833 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3904c907 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3904c907 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3904c907 Branch: refs/heads/master Commit: 3904c9074e66733686285d09ce5068d28f303dd8 Parents: 45f5951 Author: sgmc <[email protected]> Authored: Tue Feb 23 09:53:38 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:26 2016 -0800 ---------------------------------------------------------------------- .../google/cloud/dataflow/sdk/io/TextIO.java | 187 ++++++++++--------- .../sdk/runners/DataflowPipelineRunner.java | 108 +---------- .../sdk/runners/DataflowPipelineTranslator.java | 5 + .../sdk/runners/dataflow/TextIOTranslator.java | 91 +++++++++ .../cloud/dataflow/sdk/io/TextIOTest.java | 22 +++ .../sdk/runners/DataflowPipelineRunnerTest.java | 21 ++- .../runners/DataflowPipelineTranslatorTest.java | 4 +- .../dataflow/sdk/runners/TransformTreeTest.java | 9 +- 8 files changed, 238 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index d342f25..0bb2861 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -26,9 +26,11 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.worker.TextSink; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; -import com.google.cloud.dataflow.sdk.util.MimeTypes; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.common.worker.Sink; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; @@ -37,13 +39,10 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.NoSuchElementException; import java.util.regex.Pattern; @@ -67,7 +66,7 @@ import javax.annotation.Nullable; * * <p>See the following examples: * - * <pre>{@code + * <pre> {@code * Pipeline p = ...; * * // A simple Read of a local file (only runs locally): @@ -80,7 +79,7 @@ import javax.annotation.Nullable; * p.apply(TextIO.Read.named("ReadNumbers") * .from("gs://my_bucket/path/to/numbers-*.txt") * .withCoder(TextualIntegerCoder.of())); - * }</pre> + * } </pre> * * <p>To write a {@link PCollection} to one or more text files, use * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify @@ -95,7 +94,7 @@ import javax.annotation.Nullable; * will be overwritten. * * <p>For example: - * <pre>{@code + * <pre> {@code * // A simple Write to a local file (only runs locally): * PCollection<String> lines = ...; * lines.apply(TextIO.Write.to("/path/to/file.txt")); @@ -107,7 +106,7 @@ import javax.annotation.Nullable; * .to("gs://my_bucket/path/to/numbers") * .withSuffix(".txt") * .withCoder(TextualIntegerCoder.of())); - * }</pre> + * } </pre> * * <h3>Permissions</h3> * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files @@ -478,6 +477,9 @@ public class TextIO { /** Requested number of shards. 0 for automatic. */ private final int numShards; + /** Insert a shuffle before writing to decouple parallelism when numShards != 0. */ + private final boolean forceReshard; + /** The shard template of each file written, combined with prefix and suffix. */ private final String shardTemplate; @@ -485,16 +487,17 @@ public class TextIO { private final boolean validate; Bound(Coder<T> coder) { - this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true); + this(null, null, "", coder, 0, true, ShardNameTemplate.INDEX_OF_MAX, true); } private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder, - int numShards, String shardTemplate, boolean validate) { + int numShards, boolean forceReshard, String shardTemplate, boolean validate) { super(name); this.coder = coder; this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; this.numShards = numShards; + this.forceReshard = forceReshard; this.shardTemplate = shardTemplate; this.validate = validate; } @@ -507,7 +510,7 @@ public class TextIO { */ public Bound<T> named(String name) { return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, - shardTemplate, validate); + forceReshard, shardTemplate, validate); } /** @@ -520,7 +523,7 @@ public class TextIO { */ public Bound<T> to(String filenamePrefix) { validateOutputComponent(filenamePrefix); - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, shardTemplate, validate); } @@ -534,7 +537,7 @@ public class TextIO { */ public Bound<T> withSuffix(String nameExtension) { validateOutputComponent(nameExtension); - return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards, + return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards, forceReshard, shardTemplate, validate); } @@ -553,8 +556,30 @@ public class TextIO { * @see ShardNameTemplate */ public Bound<T> withNumShards(int numShards) { + return withNumShards(numShards, forceReshard); + } + + /** + * Returns a transform for writing to text files that's like this one but + * that uses the provided shard count. + * + * <p>Constraining the number of shards is likely to reduce + * the performance of a pipeline. If forceReshard is true, the output + * will be shuffled to obtain the desired sharding. If it is false, + * data will not be reshuffled, but parallelism of preceeding stages + * may be constrained. Setting this value is not recommended + * unless you require a specific number of output files. + * + * <p>Does not modify this object. + * + * @param numShards the number of shards to use, or 0 to let the system + * decide. + * @param forceReshard whether to force a reshard to obtain the desired sharding. + * @see ShardNameTemplate + */ + private Bound<T> withNumShards(int numShards, boolean forceReshard) { Preconditions.checkArgument(numShards >= 0); - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, shardTemplate, validate); } @@ -567,7 +592,7 @@ public class TextIO { * @see ShardNameTemplate */ public Bound<T> withShardNameTemplate(String shardTemplate) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, shardTemplate, validate); } @@ -585,7 +610,25 @@ public class TextIO { * <p>Does not modify this object. */ public Bound<T> withoutSharding() { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate); + return withoutSharding(forceReshard); + } + + /** + * Returns a transform for writing to text files that's like this one but + * that forces a single file as output. + * + * <p>Constraining the number of shards is likely to reduce + * the performance of a pipeline. Using this setting is not recommended + * unless you truly require a single output file. + * + * <p>This is a shortcut for + * {@code .withNumShards(1, forceReshard).withShardNameTemplate("")} + * + * <p>Does not modify this object. + */ + private Bound<T> withoutSharding(boolean forceReshard) { + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, forceReshard, "", + validate); } /** @@ -597,7 +640,7 @@ public class TextIO { * @param <X> the type of the elements of the input {@link PCollection} */ public <X> Bound<X> withCoder(Coder<X> coder) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, shardTemplate, validate); } @@ -612,7 +655,7 @@ public class TextIO { * <p>Does not modify this object. */ public Bound<T> withoutValidation() { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, + return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, forceReshard, shardTemplate, false); } @@ -622,13 +665,14 @@ public class TextIO { throw new IllegalStateException( "need to set the filename prefix of a TextIO.Write transform"); } - - // Note that custom sinks currently do not expose sharding controls. - // Thus pipeline runner writers need to individually add support internally to - // apply user requested sharding limits. - return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( - new TextSink<>( - filenamePrefix, filenameSuffix, shardTemplate, coder))); + if (numShards > 0 && forceReshard) { + // Reshard and re-apply a version of this write without resharding. + return input + .apply(new FileBasedSink.ReshardForWrite<T>()) + .apply(withNumShards(numShards, false)); + } else { + return PDone.in(input.getPipeline()); + } } /** @@ -666,6 +710,17 @@ public class TextIO { public boolean needsValidation() { return validate; } + + static { + DirectPipelineRunner.registerDefaultTransformEvaluator( + Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() { + @Override + public void evaluate( + Bound transform, DirectPipelineRunner.EvaluationContext context) { + evaluateWriteHelper(transform, context); + } + }); + } } } @@ -923,70 +978,24 @@ public class TextIO { } } - /** - * A {@link FileBasedSink} for text files. Produces text files with the new line separator - * {@code '\n'} represented in {@code UTF-8} format as the record separator. - * Each record (including the last) is terminated. - */ - @VisibleForTesting - static class TextSink<T> extends FileBasedSink<T> { - private final Coder<T> coder; - - @VisibleForTesting - TextSink( - String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) { - super(baseOutputFilename, extension, fileNameTemplate); - this.coder = coder; + private static <T> void evaluateWriteHelper( + Write.Bound<T> transform, DirectPipelineRunner.EvaluationContext context) { + List<T> elems = context.getPCollection(context.getInput(transform)); + int numShards = transform.numShards; + if (numShards < 1) { + // System gets to choose. For direct mode, choose 1. + numShards = 1; } - - @Override - public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) { - return new TextWriteOperation<>(this, coder); - } - - /** - * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation - * FileBasedWriteOperation} for text files. - */ - private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> { - private final Coder<T> coder; - - private TextWriteOperation(TextSink<T> sink, Coder<T> coder) { - super(sink); - this.coder = coder; - } - - @Override - public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception { - return new TextWriter<>(this, coder); - } - } - - /** - * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter} - * for text files. - */ - private static class TextWriter<T> extends FileBasedWriter<T> { - private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final Coder<T> coder; - private OutputStream out; - - public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) { - super(writeOperation); - this.mimeType = MimeTypes.TEXT; - this.coder = coder; - } - - @Override - protected void prepareWrite(WritableByteChannel channel) throws Exception { - out = Channels.newOutputStream(channel); - } - - @Override - public void write(T value) throws Exception { - coder.encode(value, out, Context.OUTER); - out.write(NEWLINE); - } + TextSink<WindowedValue<T>> writer = TextSink.createForDirectPipelineRunner( + transform.filenamePrefix, transform.getShardNameTemplate(), transform.filenameSuffix, + numShards, true, null, null, transform.coder); + try (Sink.SinkWriter<WindowedValue<T>> sink = writer.writer()) { + for (T elem : elems) { + sink.add(WindowedValue.valueInGlobalWindow(elem)); + } + } catch (IOException exn) { + throw new RuntimeException( + "unable to write to output file \"" + transform.filenamePrefix + "\"", exn); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 396d308..5a57f7f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -339,7 +339,6 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> builder.put(Window.Bound.class, AssignWindows.class); builder.put(Write.Bound.class, BatchWrite.class); builder.put(AvroIO.Write.Bound.class, BatchAvroIOWrite.class); - builder.put(TextIO.Write.Bound.class, BatchTextIOWrite.class); if (options.getExperiments() == null || !options.getExperiments().contains("disable_ism_side_input")) { builder.put(View.AsMap.class, BatchViewAsMap.class); @@ -1996,111 +1995,6 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> /** * Specialized implementation which overrides - * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with - * a native sink instead of a custom sink as workaround until custom sinks - * have support for sharding controls. - */ - private static class BatchTextIOWrite<T> extends PTransform<PCollection<T>, PDone> { - private final TextIO.Write.Bound<T> transform; - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() - public BatchTextIOWrite(DataflowPipelineRunner runner, TextIO.Write.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - if (transform.getNumShards() > 0) { - return input - .apply(new ReshardForWrite<T>()) - .apply(new BatchTextIONativeWrite<>(transform)); - } else { - return transform.apply(input); - } - } - } - - /** - * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way - * to provide the native definition of the Text sink. - */ - private static class BatchTextIONativeWrite<T> extends PTransform<PCollection<T>, PDone> { - private final TextIO.Write.Bound<T> transform; - public BatchTextIONativeWrite(TextIO.Write.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PDone apply(PCollection<T> input) { - return PDone.in(input.getPipeline()); - } - - static { - DataflowPipelineTranslator.registerTransformTranslator( - BatchTextIONativeWrite.class, new BatchTextIONativeWriteTranslator()); - } - } - - /** - * TextIO.Write.Bound support code for the Dataflow backend when applying parallelism limits - * through user requested sharding limits. - */ - private static class BatchTextIONativeWriteTranslator - implements TransformTranslator<BatchTextIONativeWrite<?>> { - @SuppressWarnings("unchecked") - @Override - public void translate(@SuppressWarnings("rawtypes") BatchTextIONativeWrite transform, - TranslationContext context) { - translateWriteHelper(transform, transform.transform, context); - } - - private <T> void translateWriteHelper( - BatchTextIONativeWrite<T> transform, - TextIO.Write.Bound<T> originalTransform, - TranslationContext context) { - // Note that the original transform can not be used during add step/add input - // and is only passed in to get properties from it. - - checkState(originalTransform.getNumShards() > 0, - "Native TextSink is expected to only be used when sharding controls are required."); - - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - - // TODO: drop this check when server supports alternative templates. - switch (originalTransform.getShardTemplate()) { - case ShardNameTemplate.INDEX_OF_MAX: - break; // supported by server - case "": - // Empty shard template allowed - forces single output. - Preconditions.checkArgument(originalTransform.getNumShards() <= 1, - "Num shards must be <= 1 when using an empty sharding template"); - break; - default: - throw new UnsupportedOperationException("Shard template " - + originalTransform.getShardTemplate() - + " not yet supported by Dataflow service"); - } - - // TODO: How do we want to specify format and - // format-specific properties? - context.addInput(PropertyNames.FORMAT, "text"); - context.addInput(PropertyNames.FILENAME_PREFIX, originalTransform.getFilenamePrefix()); - context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, - originalTransform.getShardNameTemplate()); - context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); - context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); - context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); - context.addEncodingInput( - WindowedValue.getValueOnlyCoder(originalTransform.getCoder())); - - } - } - - /** - * Specialized implementation which overrides * {@link com.google.cloud.dataflow.sdk.io.AvroIO.Write.Bound AvroIO.Write.Bound} with * a native sink instead of a custom sink as workaround until custom sinks * have support for sharding controls. @@ -2194,7 +2088,9 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, originalTransform.getShardTemplate()); context.addInput(PropertyNames.FILENAME_SUFFIX, originalTransform.getFilenameSuffix()); context.addInput(PropertyNames.VALIDATE_SINK, originalTransform.needsValidation()); + context.addInput(PropertyNames.NUM_SHARDS, (long) originalTransform.getNumShards()); + context.addEncodingInput( WindowedValue.getValueOnlyCoder( AvroCoder.of(originalTransform.getType(), originalTransform.getSchema()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 22ec3bb..885260e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -44,12 +44,14 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.io.BigQueryIO; import com.google.cloud.dataflow.sdk.io.PubsubIO; import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.StreamingOptions; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; import com.google.cloud.dataflow.sdk.runners.dataflow.BigQueryIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.PubsubIOTranslator; import com.google.cloud.dataflow.sdk.runners.dataflow.ReadTranslator; +import com.google.cloud.dataflow.sdk.runners.dataflow.TextIOTranslator; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -1035,6 +1037,9 @@ public class DataflowPipelineTranslator { DataflowPipelineRunner.StreamingPubsubIOWrite.class, new PubsubIOTranslator.WriteTranslator()); + registerTransformTranslator( + TextIO.Write.Bound.class, new TextIOTranslator.WriteTranslator()); + registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java new file mode 100644 index 0000000..d6c96c3 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/TextIOTranslator.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.runners.dataflow; + +import com.google.cloud.dataflow.sdk.io.ShardNameTemplate; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TransformTranslator; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; +import com.google.cloud.dataflow.sdk.util.PathValidator; +import com.google.cloud.dataflow.sdk.util.PropertyNames; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.common.base.Preconditions; + +/** + * TextIO transform support code for the Dataflow backend. + */ +public class TextIOTranslator { + /** + * Implements TextIO Write translation for the Dataflow backend. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + public static class WriteTranslator implements TransformTranslator<TextIO.Write.Bound> { + @Override + public void translate( + TextIO.Write.Bound transform, + TranslationContext context) { + translateWriteHelper(transform, context); + } + + private <T> void translateWriteHelper( + TextIO.Write.Bound<T> transform, + TranslationContext context) { + if (context.getPipelineOptions().isStreaming()) { + throw new IllegalArgumentException("TextIO not supported in streaming mode."); + } + + PathValidator validator = context.getPipelineOptions().getPathValidator(); + String filenamePrefix = validator.validateOutputFilePrefixSupported( + transform.getFilenamePrefix()); + + context.addStep(transform, "ParallelWrite"); + context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + + // TODO: drop this check when server supports alternative templates. + switch (transform.getShardTemplate()) { + case ShardNameTemplate.INDEX_OF_MAX: + break; // supported by server + case "": + // Empty shard template allowed - forces single output. + Preconditions.checkArgument(transform.getNumShards() <= 1, + "Num shards must be <= 1 when using an empty sharding template"); + break; + default: + throw new UnsupportedOperationException("Shard template " + + transform.getShardTemplate() + + " not yet supported by Dataflow service"); + } + + // TODO: How do we want to specify format and + // format-specific properties? + context.addInput(PropertyNames.FORMAT, "text"); + context.addInput(PropertyNames.FILENAME_PREFIX, filenamePrefix); + context.addInput(PropertyNames.SHARD_NAME_TEMPLATE, + transform.getShardNameTemplate()); + context.addInput(PropertyNames.FILENAME_SUFFIX, transform.getFilenameSuffix()); + context.addInput(PropertyNames.VALIDATE_SINK, transform.needsValidation()); + + long numShards = transform.getNumShards(); + if (numShards > 0) { + context.addInput(PropertyNames.NUM_SHARDS, numShards); + } + + context.addEncodingInput( + WindowedValue.getValueOnlyCoder(transform.getCoder())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 0a8e381..6ad81e4 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -264,6 +264,28 @@ public class TextIOTest { } @Test + public void testWriteSharded() throws IOException { + File outFolder = tmpFolder.newFolder(); + String filename = outFolder.toPath().resolve("output").toString(); + + Pipeline p = TestPipeline.create(); + + PCollection<String> input = + p.apply(Create.of(Arrays.asList(LINES_ARRAY)) + .withCoder(StringUtf8Coder.of())); + + input.apply(TextIO.Write.to(filename).withNumShards(2).withSuffix(".txt")); + + p.run(); + + String[] files = outFolder.list(); + + assertThat(Arrays.asList(files), + containsInAnyOrder("output-00000-of-00002.txt", + "output-00001-of-00002.txt")); + } + + @Test public void testWriteNamed() { { PTransform<PCollection<String>, PDone> transform1 = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index c5f2d3f..c7175cb 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -453,12 +453,16 @@ public class DataflowPipelineRunnerTest { @Test public void testNonGcsFilePathInWriteFailure() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + + Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")) + .apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); - pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); + p.run(); + assertValidJob(jobCaptor.getValue()); } @Test @@ -478,12 +482,17 @@ public class DataflowPipelineRunnerTest { @Test public void testMultiSlashGcsFileWritePath() throws IOException { - Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); + + Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")) + .apply(TextIO.Write.named("WriteInvalidGcsFile") + .to("gs://bucket/tmp//file")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("consecutive slashes"); - pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file")); + p.run(); + assertValidJob(jobCaptor.getValue()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index 72090a0..b9c94ad 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -403,7 +403,7 @@ public class DataflowPipelineTranslatorTest { pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) .apply(ParDo.of(new NoOpFn())) .apply(new EmbeddedTransform(predefinedStep.clone())) - .apply(ParDo.of(new NoOpFn())); + .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out")); Job job = translator.translate( pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); @@ -456,7 +456,7 @@ public class DataflowPipelineTranslatorTest { Job job = translator.translate( pipeline, pipeline.getRunner(), Collections.<DataflowPackage>emptyList()).getJob(); - assertEquals(13, job.getSteps().size()); + assertEquals(3, job.getSteps().size()); Step step = job.getSteps().get(1); assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME)); return step; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3904c907/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java index 68e1db1..f1b7cd7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/TransformTreeTest.java @@ -28,7 +28,6 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.io.Write; import com.google.cloud.dataflow.sdk.transforms.Count; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; @@ -134,12 +133,9 @@ public class TransformTreeTest { assertTrue(visited.add(TransformsSeen.SAMPLE_ANY)); assertNotNull(node.getEnclosingNode()); assertTrue(node.isCompositeNode()); - } else if (transform instanceof Write.Bound) { - assertTrue(visited.add(TransformsSeen.WRITE)); - assertNotNull(node.getEnclosingNode()); - assertTrue(node.isCompositeNode()); } assertThat(transform, not(instanceOf(Read.Bounded.class))); + assertThat(transform, not(instanceOf(TextIO.Write.Bound.class))); } @Override @@ -155,9 +151,10 @@ public class TransformTreeTest { PTransform<?, ?> transform = node.getTransform(); // Pick is a composite, should not be visited here. assertThat(transform, not(instanceOf(Sample.SampleAny.class))); - assertThat(transform, not(instanceOf(Write.Bound.class))); if (transform instanceof Read.Bounded) { assertTrue(visited.add(TransformsSeen.READ)); + } else if (transform instanceof TextIO.Write.Bound) { + assertTrue(visited.add(TransformsSeen.WRITE)); } }
