Update examples archetype to match examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1e132ee8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1e132ee8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1e132ee8 Branch: refs/heads/master Commit: 1e132ee83d5f393498c12003a328e51d0e93bd06 Parents: 9f78c44 Author: Kenneth Knowles <k...@google.com> Authored: Thu Nov 10 12:06:42 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Nov 10 14:06:39 2016 -0800 ---------------------------------------------------------------------- .../src/main/java/DebuggingWordCount.java | 69 +++++--------------- .../src/main/java/MinimalWordCount.java | 52 +++++++-------- .../src/main/java/WindowedWordCount.java | 6 +- .../src/main/java/WordCount.java | 64 +++++++----------- .../common/ExampleBigQueryTableOptions.java | 2 +- .../src/main/java/common/ExampleOptions.java | 5 ++ ...xamplePubsubTopicAndSubscriptionOptions.java | 2 +- .../java/common/ExamplePubsubTopicOptions.java | 2 +- .../src/main/java/common/ExampleUtils.java | 3 +- .../src/test/java/DebuggingWordCountTest.java | 15 ++++- .../src/test/java/WordCountTest.java | 7 +- 11 files changed, 91 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index 9727379..99ae796 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -50,10 +50,9 @@ import org.slf4j.LoggerFactory; * * <p>New Concepts: * <pre> - * 1. Logging to Cloud Logging - * 2. Controlling worker log levels - * 3. Creating a custom aggregator - * 4. Testing your Pipeline via PAssert + * 1. Logging using SLF4J, even in a distributed environment + * 2. Creating a custom aggregator (runners have varying levels of support) + * 3. Testing your Pipeline via PAssert * </pre> * * <p>To execute this pipeline locally, specify general pipeline configuration: @@ -68,51 +67,20 @@ import org.slf4j.LoggerFactory; * } * </pre> * - * <p>To use the additional logging discussed below, specify: - * <pre>{@code - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * } - * </pre> - * - * <p>Note that when you run via <code>mvn exec</code>, you may need to escape - * the quotations as appropriate for your shell. For example, in <code>bash</code>: - * <pre> - * mvn compile exec:java ... \ - * -Dexec.args="... \ - * --workerLogLevelOverrides={\\\"org.apache.beam.examples\\\":\\\"DEBUG\\\"}" - * </pre> + * <p>The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. * - * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - * <pre><code> - * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...} - * </code></pre> - * For example, by specifying: - * <pre><code> - * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} - * </code></pre> - * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code org.apache.beam.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn - * as the logger. All log statements emitted by this logger will be referenced by this name - * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging - * about the Cloud Logging UI. + * Concept #1: The logger below uses the fully qualified class name of FilterTextFn as the + * logger. Depending on your SLF4J configuration, log statements will likely be qualified by + * this name. + * + * <p>Note that this is entirely standard SLF4J usage. Some runners may provide a default SLF4J + * configuration that is most appropriate for their logging integration. */ private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); @@ -122,11 +90,9 @@ public class DebuggingWordCount { } /** - * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those - * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the - * Dataflow service. These aggregators below track the number of matched and unmatched words. - * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about - * the Dataflow Monitoring UI. + * Concept #2: A custom aggregator can track values in your pipeline as it runs. Each + * runner provides varying levels of support for aggregators, and may expose them + * in a dashboard, etc. */ private final Aggregator<Long, Long> matchedWords = createAggregator("matchedWords", new Sum.SumLongFn()); @@ -137,8 +103,7 @@ public class DebuggingWordCount { public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline - // using the Dataflow service, these log lines will appear in the Cloud Logging UI - // only if the log level is set to "DEBUG" or lower. + // these log lines will appear only if the log level is set to "DEBUG" or lower. LOG.debug("Matched: " + c.element().getKey()); matchedWords.addValue(1L); c.output(c.element()); @@ -178,7 +143,7 @@ public class DebuggingWordCount { .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); /** - * Concept #4: PAssert is a set of convenient PTransforms in the style of + * Concept #3: PAssert is a set of convenient PTransforms in the style of * Hamcrest's collection matchers that can be used when writing Pipeline level tests * to validate the contents of PCollections. PAssert is best used in unit tests * with small data sets but is demonstrated here as a teaching tool. @@ -194,6 +159,6 @@ public class DebuggingWordCount { KV.of("stomach", 1L)); PAssert.that(filteredWords).containsInAnyOrder(expectedResults); - p.run(); + p.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java index e8497c0..97bd824 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java @@ -37,46 +37,33 @@ import org.apache.beam.sdk.values.KV; * argument processing, and focus on construction of the pipeline, which chains together the * application of core transforms. * - * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally - * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional + * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the + * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional * concepts. * * <p>Concepts: + * * <pre> * 1. Reading data from text files * 2. Specifying 'inline' transforms - * 3. Counting a PCollection - * 4. Writing data to Cloud Storage as text files + * 3. Counting items in a PCollection + * 4. Writing data to text files * </pre> * - * <p>To execute this pipeline, first edit the code to set your project ID, the temp - * location, and the output location. The specified GCS bucket(s) must already exist. - * - * <p>Then, run the pipeline as described in the README. It will be deployed and run with the - * selected runner. No args are required to run the pipeline. You can see the results in your - * output bucket in the GCS browser. + * <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You + * can see the results in the output files in your current working directory, with names like + * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate + * file service. */ public class MinimalWordCount { public static void main(String[] args) { // Create a PipelineOptions object. This object lets us set various execution - // options for our pipeline, such as the associated Cloud Platform project and the location - // in Google Cloud Storage to stage files. + // options for our pipeline, such as the runner you wish to use. This example + // will run with the DirectRunner by default, based on the class path configured + // in its dependencies. PipelineOptions options = PipelineOptionsFactory.create(); - // In order to run your pipeline, you need to make following runner specific changes: - // - // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner. - // CHANGE 2/3: Specify runner-required options. - // For DataflowRunner, set project and temp location as follows: - // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - // dataflowOptions.setRunner(DataflowRunner.class); - // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); - // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); - // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} - // for more details. - // options.setRunner(FlinkRunner.class); - // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(options); @@ -85,7 +72,10 @@ public class MinimalWordCount { // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set // of input text files. TextIO.Read returns a PCollection where each element is one line from // the input text (a set of Shakespeare's texts). + + // This example reads a public data set consisting of the complete works of Shakespeare. p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) + // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection<String>, where each element is an individual word in @@ -100,10 +90,12 @@ public class MinimalWordCount { } } })) + // Concept #3: Apply the Count transform to our PCollection of individual words. The Count // transform returns a new PCollection of key/value pairs, where each key represents a unique // word in the text. The associated value is the occurrence count for that word. .apply(Count.<String>perElement()) + // Apply a MapElements transform that formats our PCollection of word counts into a printable // string, suitable for writing to an output file. .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @@ -112,13 +104,15 @@ public class MinimalWordCount { return input.getKey() + ": " + input.getValue(); } })) + // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of - // formatted strings) to a series of text files in Google Cloud Storage. - // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); + // formatted strings) to a series of text files. + // + // By default, it will write to a set of files with names like wordcount-00001-of-00005 + .apply(TextIO.Write.to("wordcounts")); // Run the pipeline. - p.run(); + p.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java index c92a37c..2812531 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java @@ -17,9 +17,6 @@ */ package ${package}; -import ${package}.common.ExampleBigQueryTableOptions; -import ${package}.common.ExampleOptions; -import ${package}.common.ExampleUtils; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -27,6 +24,9 @@ import com.google.api.services.bigquery.model.TableSchema; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import ${package}.common.ExampleBigQueryTableOptions; +import ${package}.common.ExampleOptions; +import ${package}.common.ExampleUtils; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java index 80bfd3a..8fe7137 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -17,15 +17,13 @@ */ package ${package}; -import com.google.common.base.Strings; -import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -34,8 +32,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.IOChannelFactory; -import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -53,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection; * </a> * * <p>Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to GCS. + * Reading text files; counting a PCollection; writing to text files * * <p>New Concepts: * <pre> @@ -63,30 +59,31 @@ import org.apache.beam.sdk.values.PCollection; * 4. Defining your own pipeline options * </pre> * - * <p>Concept #1: you can execute this pipeline either locally or using the selected runner. + * <p>Concept #1: you can execute this pipeline either locally or using by selecting another runner. * These are now command-line options and not hard-coded as they were in the MinimalWordCount * example. - * To execute this pipeline locally, specify a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> * * <p>To change the runner, specify: * <pre>{@code * --runner=YOUR_SELECTED_RUNNER * } * </pre> - * See examples/java/README.md for instructions about how to configure different runners. * - * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} - * and can be overridden with {@code --inputFile}. + * <p>To execute this pipeline, specify a local output file (if using the + * {@code DirectRunner}) or output prefix on a supported distributed file system. + * <pre>{@code + * --output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX] + * }</pre> + * + * <p>The input file defaults to a public data set containing the text of of King Lear, + * by William Shakespeare. You can override it and choose your own input with {@code --inputFile}. */ public class WordCount { /** - * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. + * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns + * statically out-of-line. This DoFn tokenizes lines of text into individual words; we pass it + * to a ParDo in the pipeline. */ static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = @@ -153,36 +150,23 @@ public class WordCount { * <p>Inherits standard configuration options. */ public interface WordCountOptions extends PipelineOptions { + + /** + * By default, this example reads from a public dataset containing the text of + * King Lear. Set this option to choose a different input file or glob. + */ @Description("Path of the file to read from") @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); + /** + * Set this required option to specify where to write the output. + */ @Description("Path of the file to write to") - @Default.InstanceFactory(OutputFactory.class) + @Required String getOutput(); void setOutput(String value); - - /** - * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination. - */ - public static class OutputFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - String tempLocation = options.getTempLocation(); - if (!Strings.isNullOrEmpty(tempLocation)) { - try { - IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); - return factory.resolve(tempLocation, "counts.txt"); - } catch (IOException e) { - throw new RuntimeException( - String.format("Failed to resolve temp location: %s", tempLocation)); - } - } else { - throw new IllegalArgumentException("Must specify --output or --tempLocation"); - } - } - } } public static void main(String[] args) { @@ -197,6 +181,6 @@ public class WordCount { .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.Write.to(options.getOutput())); - p.run(); + p.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java index 96e8406..6b51074 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java @@ -46,7 +46,7 @@ public interface ExampleBigQueryTableOptions extends GcpOptions { /** * Returns the job name as the default BigQuery table name. */ - static class BigQueryTableFactory implements DefaultValueFactory<String> { + class BigQueryTableFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { return options.getJobName().replace('-', '_'); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java index 221e266..90f935c 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java @@ -29,4 +29,9 @@ public interface ExampleOptions extends PipelineOptions { @Default.Boolean(false) boolean getKeepJobsRunning(); void setKeepJobsRunning(boolean keepJobsRunning); + + @Description("Number of workers to use when executing the injector pipeline") + @Default.Integer(1) + int getInjectorNumWorkers(); + void setInjectorNumWorkers(int numWorkers); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java index e3fb132..daeb398 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java @@ -35,7 +35,7 @@ public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubT /** * Returns a default Pub/Sub subscription based on the project and the job names. */ - static class PubsubSubscriptionFactory implements DefaultValueFactory<String> { + class PubsubSubscriptionFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { return "projects/" + options.as(GcpOptions.class).getProject() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java index 1825267..936bff5 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java @@ -35,7 +35,7 @@ public interface ExamplePubsubTopicOptions extends GcpOptions { /** * Returns a default Pub/Sub topic based on the project and the job names. */ - static class PubsubTopicFactory implements DefaultValueFactory<String> { + class PubsubTopicFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { return "projects/" + options.as(GcpOptions.class).getProject() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java index c1b6489..570b382 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java @@ -272,8 +272,7 @@ public class ExampleUtils { } /** - * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, - * waits for the pipeline to finish and cancels it (and the injector) before the program exists. + * Waits for the pipeline to finish and cancels it before the program exists. */ public void waitToFinish(PipelineResult result) { pipelinesToCancel.add(result); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java index dfa1a75..155242d 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java @@ -20,6 +20,8 @@ package ${package}; import com.google.common.io.Files; import java.io.File; import java.nio.charset.StandardCharsets; +import ${package}.DebuggingWordCount.WordCountOptions; +import org.apache.beam.sdk.testing.TestPipeline; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -35,9 +37,16 @@ public class DebuggingWordCountTest { @Test public void testDebuggingWordCount() throws Exception { - File file = tmpFolder.newFile(); - Files.write("stomach secret Flourish message Flourish here Flourish", file, + File inputFile = tmpFolder.newFile(); + File outputFile = tmpFolder.newFile(); + Files.write( + "stomach secret Flourish message Flourish here Flourish", + inputFile, StandardCharsets.UTF_8); - DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()}); + WordCountOptions options = + TestPipeline.testingPipelineOptions().as(WordCountOptions.class); + options.setInputFile(inputFile.getAbsolutePath()); + options.setOutput(outputFile.getAbsolutePath()); + DebuggingWordCount.main(TestPipeline.convertToArgs(options)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1e132ee8/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java index 83d0f37..e86c2aa 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java @@ -17,12 +17,11 @@ */ package ${package}; +import java.util.Arrays; +import java.util.List; import ${package}.WordCount.CountWords; import ${package}.WordCount.ExtractWordsFn; import ${package}.WordCount.FormatAsTextFn; - -import java.util.Arrays; -import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; @@ -81,6 +80,6 @@ public class WordCountTest { .apply(MapElements.via(new FormatAsTextFn())); PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); - p.run(); + p.run().waitUntilFinish(); } }