Revise WordCount example to be better cross-runner example
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b05a8c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b05a8c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b05a8c7 Branch: refs/heads/master Commit: 0b05a8c7ff8e1f76516a6b13d504f776b5c9111e Parents: c64cf36 Author: Kenneth Knowles <[email protected]> Authored: Thu Nov 3 14:19:47 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Nov 8 13:51:25 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/examples/WordCount.java | 62 ++++++++------------ 1 file changed, 23 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0b05a8c7/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index e7eab6e..5be0ddc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -17,15 +17,13 @@ */ package org.apache.beam.examples; -import com.google.common.base.Strings; -import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -34,8 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -53,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection; * </a> * * <p>Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to GCS. + * Reading text files; counting a PCollection; writing to text files * * <p>New Concepts: * <pre> @@ -63,30 +59,31 @@ import org.apache.beam.sdk.values.PCollection; * 4. Defining your own pipeline options * </pre> * - * <p>Concept #1: you can execute this pipeline either locally or using the selected runner. + * <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner. * These are now command-line options and not hard-coded as they were in the MinimalWordCount * example. - * To execute this pipeline locally, specify a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> * * <p>To change the runner, specify: * <pre>{@code * --runner=YOUR_SELECTED_RUNNER * } * </pre> - * See examples/java/README.md for instructions about how to configure different runners. * - * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. + * <p>To execute this pipeline, specify a local output file (if using the + * {@code DirectRunner}) or output prefix on a supported distributed file system. + * <pre>{@code + * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX] + * }</pre> + * + * <p>The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. */ public class WordCount { /** - * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. + * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns + * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it + * to a ParDo in the pipeline. */ static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = @@ -153,36 +150,23 @@ public class WordCount { * <p>Inherits standard configuration options. */ public interface WordCountOptions extends PipelineOptions { + + /** + * By default, this example reads from a public dataset containing the text of + * King Lear. Set this option to choose a different input file or glob. + */ @Description("Path of the file to read from") @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); + /** + * Set this required option to specify where to write the output. + */ @Description("Path of the file to write to") - @Default.InstanceFactory(OutputFactory.class) + @Required String getOutput(); void setOutput(String value); - - /** - * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination. - */ - class OutputFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - String tempLocation = options.getTempLocation(); - if (!Strings.isNullOrEmpty(tempLocation)) { - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - return factory.resolve(tempLocation, "counts.txt"); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve temp location: %s", tempLocation)); - } - } else { - throw new IllegalArgumentException("Must specify --output or --tempLocation"); - } - } - } } public static void main(String[] args) {
