Update Beam examples archetypes
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9a66e4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9a66e4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9a66e4b Branch: refs/heads/master Commit: b9a66e4b50ae1fe5fa3afc33b2523e2f9d64b2c4 Parents: e3768f6 Author: Pei He <[email protected]> Authored: Thu Sep 8 19:16:12 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue Sep 13 18:01:08 2016 -0700 ---------------------------------------------------------------------- .../src/main/java/DebuggingWordCount.java | 31 +- .../src/main/java/MinimalWordCount.java | 51 ++- .../src/main/java/WindowedWordCount.java | 139 +++---- .../src/main/java/WordCount.java | 77 ++-- .../java/common/DataflowExampleOptions.java | 32 -- .../main/java/common/DataflowExampleUtils.java | 391 ------------------- .../common/ExampleBigQueryTableOptions.java | 11 +- .../src/main/java/common/ExampleOptions.java | 37 ++ ...xamplePubsubTopicAndSubscriptionOptions.java | 45 +++ .../java/common/ExamplePubsubTopicOptions.java | 17 +- .../src/main/java/common/ExampleUtils.java | 353 +++++++++++++++++ .../main/java/common/PubsubFileInjector.java | 8 +- .../src/test/java/WordCountTest.java | 9 +- 13 files changed, 592 insertions(+), 609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index e9f4333..e315ba9 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; @@ -36,8 +36,9 @@ import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** - * An example that verifies word counts in Shakespeare and includes Dataflow best practices. + * An example that verifies word counts in Shakespeare and includes Beam best practices. * * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount} @@ -46,12 +47,12 @@ import org.slf4j.LoggerFactory; * * <p>Basic concepts, also in the MinimalWordCount and WordCount examples: * Reading text files; counting a PCollection; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns. + * and using a selected runner; defining DoFns. * * <p>New Concepts: * <pre> * 1. Logging to Cloud Logging - * 2. Controlling Dataflow worker log levels + * 2. Controlling worker log levels * 3. Creating a custom aggregator * 4. Testing your Pipeline via PAssert * </pre> @@ -62,12 +63,14 @@ import org.slf4j.LoggerFactory; * } * </pre> * - * <p>To execute this pipeline using the Dataflow service and the additional logging discussed - * below, specify pipeline configuration: + * <p>To change the runner, specify: + * <pre>{@code + * --runner=YOUR_SELECTED_RUNNER + * } + * </pre> + * + * <p>To use the additional logging discussed below, specify: * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowRunner * --workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"} * } * </pre> @@ -100,12 +103,12 @@ import org.slf4j.LoggerFactory; * that changing the default worker log level to TRACE or DEBUG will significantly increase * the amount of logs output. * - * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. + * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} + * and can be overridden with {@code --inputFile}. */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> { + public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { /** * Concept #1: The logger below uses the fully qualified class name of FilterTextFn * as the logger. All log statements emitted by this logger will be referenced by this name @@ -131,7 +134,7 @@ public class DebuggingWordCount { private final Aggregator<Long, Long> unmatchedWords = createAggregator("umatchedWords", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { if (filter.matcher(c.element().getKey()).matches()) { // Log at the "DEBUG" level each element that we match. When executing this pipeline @@ -149,7 +152,7 @@ public class DebuggingWordCount { } } } - + /** * Options supported by {@link DebuggingWordCount}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java index 55beb1f..f739fd8 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java @@ -17,14 +17,15 @@ */ package ${package}; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; @@ -48,26 +49,34 @@ import org.apache.beam.sdk.values.KV; * 4. Writing data to Cloud Storage as text files * </pre> * - * <p>To execute this pipeline, first edit the code to set your project ID, the staging + * <p>To execute this pipeline, first edit the code to set your project ID, the temp * location, and the output location. The specified GCS bucket(s) must already exist. * - * <p>Then, run the pipeline as described in the README. It will be deployed and run using the - * Dataflow service. No args are required to run the pipeline. You can see the results in your + * <p>Then, run the pipeline as described in the README. It will be deployed and run with the + * selected runner. No args are required to run the pipeline. You can see the results in your * output bucket in the GCS browser. */ public class MinimalWordCount { public static void main(String[] args) { - // Create a DataflowPipelineOptions object. This object lets us set various execution + // Create a PipelineOptions object. This object lets us set various execution // options for our pipeline, such as the associated Cloud Platform project and the location // in Google Cloud Storage to stage files. - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowRunner.class); - // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); + PipelineOptions options = PipelineOptionsFactory.create(); + + // In order to run your pipeline, you need to make following runner specific changes: + // + // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner. + // CHANGE 2/3: Specify runner-required options. + // For DataflowRunner, set project and temp location as follows: + // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + // dataflowOptions.setRunner(DataflowRunner.class); + // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); + // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); + // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} + // for more details. + // options.as(FlinkPipelineOptions.class) + // .setRunner(FlinkRunner.class); // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(options); @@ -77,13 +86,13 @@ public class MinimalWordCount { // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set // of input text files. TextIO.Read returns a PCollection where each element is one line from // the input text (a set of Shakespeare's texts). - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection<String>, where each element is an individual word in // Shakespeare's collected texts. - .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { - @Override + .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { + @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { if (!word.isEmpty()) { @@ -96,12 +105,12 @@ public class MinimalWordCount { // transform returns a new PCollection of key/value pairs, where each key represents a unique // word in the text. The associated value is the occurrence count for that word. .apply(Count.<String>perElement()) - // Apply another ParDo transform that formats our PCollection of word counts into a printable + // Apply a MapElements transform that formats our PCollection of word counts into a printable // string, suitable for writing to an output file. - .apply("FormatResults", ParDo.of(new OldDoFn<KV<String, Long>, String>() { + .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ": " + c.element().getValue()); + public String apply(KV<String, Long> input) { + return input.getKey() + ": " + input.getValue(); } })) // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java index 17bf7ca..787e8c9 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java @@ -17,23 +17,24 @@ */ package ${package}; +import ${package}.common.ExampleBigQueryTableOptions; +import ${package}.common.ExampleOptions; +import ${package}.common.ExampleUtils; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import ${package}.common.DataflowExampleUtils; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.PubsubIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -41,8 +42,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + /** * An example that counts words in text, and can run over either unbounded or bounded input @@ -54,58 +54,43 @@ import org.slf4j.LoggerFactory; * * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples: * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns; creating a custom aggregator; + * and using a selected runner; defining DoFns; creating a custom aggregator; * user-defined PTransforms; defining PipelineOptions. * * <p>New Concepts: * <pre> * 1. Unbounded and bounded pipeline input modes * 2. Adding timestamps to data - * 3. PubSub topics as sources - * 4. Windowing - * 5. Re-using PTransforms over windowed PCollections - * 6. Writing to BigQuery - * </pre> - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } + * 3. Windowing + * 4. Re-using PTransforms over windowed PCollections + * 5. Writing to BigQuery * </pre> * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <p>By default, the examples will run with the {@code DirectRunner}. + * To change the runner, specify: * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowRunner + * --runner=YOUR_SELECTED_RUNNER * } * </pre> + * See examples/java/README.md for instructions about how to configure different runners. * * <p>Optionally specify the input file path via: * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}. + * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}. * * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code dataflow-examples} must already exist in your project. + * dataset, a dataset called {@code beam_examples} must already exist in your project. * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. * - * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or - * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set - * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from - * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not - * exist, the pipeline will create one for you. It will delete this topic when it terminates. - * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub - * topic with the contents of the {@code --inputFile}, in order to make the example easy to run. - * If you want to use an independently-populated PubSub topic, indicate this by setting - * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started. - * * <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} * for 10-minute windows. + * + * <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C) + * and then exits. */ public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); static final int WINDOW_SIZE = 1; // Default window duration in minutes /** @@ -116,14 +101,19 @@ public class WindowedWordCount { * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a * 2-hour period. */ - static class AddTimestampFn extends OldDoFn<String, String> { - private static final long RAND_RANGE = 7200000; // 2 hours in ms + static class AddTimestampFn extends DoFn<String, String> { + private static final Duration RAND_RANGE = Duration.standardHours(2); + private final Instant minTimestamp; - @Override + AddTimestampFn() { + this.minTimestamp = new Instant(System.currentTimeMillis()); + } + + @ProcessElement public void processElement(ProcessContext c) { // Generate a timestamp that falls somewhere in the past two hours. - long randomTimestamp = System.currentTimeMillis() - - (int) (Math.random() * RAND_RANGE); + long randMillis = (long) (Math.random() * RAND_RANGE.getMillis()); + Instant randomTimestamp = minTimestamp.plus(randMillis); /** * Concept #2: Set the data element with that timestamp. */ @@ -132,8 +122,8 @@ public class WindowedWordCount { } /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> { - @Override + static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { + @ProcessElement public void processElement(ProcessContext c) { TableRow row = new TableRow() .set("word", c.element().getKey()) @@ -157,7 +147,7 @@ public class WindowedWordCount { } /** - * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one + * Concept #5: We'll stream the results to a BigQuery table. The BigQuery output source is one * that supports both bounded and unbounded data. This is a helper method that creates a * TableReference from input options, to tell the pipeline where to write its BigQuery results. */ @@ -173,56 +163,39 @@ public class WindowedWordCount { * Options supported by {@link WindowedWordCount}. * * <p>Inherits standard example configuration options, which allow specification of the BigQuery - * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for + * table, as well as the {@link WordCount.WordCountOptions} support for * specification of the input file. */ - public static interface Options - extends WordCount.WordCountOptions, DataflowExampleUtils.DataflowExampleUtilsOptions { + public static interface Options extends WordCount.WordCountOptions, + ExampleOptions, ExampleBigQueryTableOptions { @Description("Fixed window duration, in minutes") @Default.Integer(WINDOW_SIZE) Integer getWindowSize(); void setWindowSize(Integer value); - - @Description("Whether to run the pipeline with unbounded input") - boolean isUnbounded(); - void setUnbounded(boolean value); } public static void main(String[] args) throws IOException { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setBigQuerySchema(getSchema()); - // DataflowExampleUtils creates the necessary input sources to simplify execution of this - // Pipeline. - DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options, - options.isUnbounded()); + // ExampleUtils creates the necessary input sources to simplify execution of this Pipeline. + ExampleUtils exampleUtils = new ExampleUtils(options); + exampleUtils.setup(); Pipeline pipeline = Pipeline.create(options); /** - * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or + * Concept #1: the Beam SDK lets us run the same pipeline with either a bounded or * unbounded input source. */ - PCollection<String> input; - if (options.isUnbounded()) { - LOG.info("Reading from PubSub."); - /** - * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't - * specified as an argument. The data elements' timestamps will come from the pubsub - * injection. - */ - input = pipeline - .apply(PubsubIO.Read.topic(options.getPubsubTopic())); - } else { - /** Else, this is a bounded pipeline. Read from the GCS file. */ - input = pipeline - .apply(TextIO.Read.from(options.getInputFile())) - // Concept #2: Add an element timestamp, using an artificial time just to show windowing. - // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); - } + PCollection<String> input = pipeline + /** Read from the GCS file. */ + .apply(TextIO.Read.from(options.getInputFile())) + // Concept #2: Add an element timestamp, using an artificial time just to show windowing. + // See AddTimestampFn for more detail on this. + .apply(ParDo.of(new AddTimestampFn())); /** - * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1 + * Concept #3: Window into fixed windows. The fixed window size for this example defaults to 1 * minute (you can change this with a command-line option). See the documentation for more * information on how fixed windows work, and for information on the other types of windowing * available (e.g., sliding windows). @@ -232,29 +205,25 @@ public class WindowedWordCount { FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); /** - * Concept #5: Re-use our existing CountWords transform that does not have knowledge of + * Concept #4: Re-use our existing CountWords transform that does not have knowledge of * windows over a PCollection containing windowed values. */ PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords()); /** - * Concept #6: Format the results for a BigQuery table, then write to BigQuery. + * Concept #5: Format the results for a BigQuery table, then write to BigQuery. * The BigQuery output source supports both bounded and unbounded data. */ wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema())); + .apply(BigQueryIO.Write + .to(getTableReference(options)) + .withSchema(getSchema()) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); PipelineResult result = pipeline.run(); - /** - * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that - * runs for a limited time, and publishes to the input PubSub topic. - * - * With an unbounded input source, you will need to explicitly shut down this pipeline when you - * are done with it, so that you do not continue to be charged for the instances. You can do - * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow - * pipelines. The PubSub topic will also be deleted at this time. - */ - exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result); + // ExampleUtils will try to cancel the pipeline before the program exists. + exampleUtils.waitToFinish(result); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java index 5432036..b096d8d 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -17,7 +17,8 @@ */ package ${package}; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import com.google.common.base.Strings; +import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Default; @@ -27,17 +28,19 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - /** - * An example that counts words in Shakespeare and includes Dataflow best practices. + * An example that counts words in Shakespeare and includes Beam best practices. * * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. @@ -45,8 +48,8 @@ import org.apache.beam.sdk.values.PCollection; * pipeline, for introduction of additional concepts. * * <p>For a detailed walkthrough of this example, see - * <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example"> - * https://cloud.google.com/dataflow/java-sdk/wordcount-example + * <a href="http://beam.incubator.apache.org/use/walkthroughs/"> + * http://beam.incubator.apache.org/use/walkthroughs/ * </a> * * <p>Basic concepts, also in the MinimalWordCount example: @@ -54,39 +57,29 @@ import org.apache.beam.sdk.values.PCollection; * * <p>New Concepts: * <pre> - * 1. Executing a Pipeline both locally and using the Dataflow service + * 1. Executing a Pipeline both locally and using the selected runner * 2. Using ParDo with static DoFns defined out-of-line * 3. Building a composite transform * 4. Defining your own pipeline options * </pre> * - * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service. + * <p>Concept #1: you can execute this pipeline either locally or using the selected runner. * These are now command-line options and not hard-coded as they were in the MinimalWordCount * example. - * To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * and a local output file or output prefix on GCS: + * To execute this pipeline locally, specify a local output file or output prefix on GCS: * <pre>{@code * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] * }</pre> * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <p>To change the runner, specify: * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowRunner + * --runner=YOUR_SELECTED_RUNNER * } * </pre> - * and an output prefix on GCS: - * <pre>{@code - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> + * See examples/java/README.md for instructions about how to configure different runners. * - * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. + * <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} + * and can be overridden with {@code --inputFile}. */ public class WordCount { @@ -95,11 +88,11 @@ public class WordCount { * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the * pipeline. */ - static class ExtractWordsFn extends OldDoFn<String, String> { + static class ExtractWordsFn extends DoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); - @Override + @ProcessElement public void processElement(ProcessContext c) { if (c.element().trim().isEmpty()) { emptyLines.addValue(1L); @@ -117,11 +110,11 @@ public class WordCount { } } - /** A DoFn that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends OldDoFn<KV<String, Long>, String> { + /** A SimpleFunction that converts a Word and Count into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ": " + c.element().getValue()); + public String apply(KV<String, Long> input) { + return input.getKey() + ": " + input.getValue(); } } @@ -161,7 +154,7 @@ public class WordCount { */ public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); @@ -171,21 +164,25 @@ public class WordCount { void setOutput(String value); /** - * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination. + * Returns "gs://${YOUR_TEMP_DIRECTORY}/counts.txt" as the default destination. */ public static class OutputFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - if (dataflowOptions.getStagingLocation() != null) { - return GcsPath.fromUri(dataflowOptions.getStagingLocation()) - .resolve("counts.txt").toString(); + String tempLocation = options.getTempLocation(); + if (!Strings.isNullOrEmpty(tempLocation)) { + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + return factory.resolve(tempLocation, "counts.txt"); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve temp location: %s", tempLocation)); + } } else { - throw new IllegalArgumentException("Must specify --output or --stagingLocation"); + throw new IllegalArgumentException("Must specify --output or --tempLocation"); } } } - } public static void main(String[] args) { @@ -197,7 +194,7 @@ public class WordCount { // static FormatAsTextFn() to the ParDo transform. p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new CountWords()) - .apply(ParDo.of(new FormatAsTextFn())) + .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java deleted file mode 100644 index e3bf7c5..0000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; - -/** - * Options that can be used to configure the Dataflow examples. - */ -public interface DataflowExampleOptions extends DataflowPipelineOptions { - @Description("Whether to keep jobs running on the Dataflow service after local process exit") - @Default.Boolean(false) - boolean getKeepJobsRunning(); - void setKeepJobsRunning(boolean keepJobsRunning); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java deleted file mode 100644 index 9e6be78..0000000 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package ${package}.common; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Datasets; -import com.google.api.services.bigquery.Bigquery.Tables; -import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.DatasetReference; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.Topic; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import javax.servlet.http.HttpServletResponse; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.DataflowPipelineJob; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.transforms.IntraBundleParallelization; -import org.apache.beam.sdk.util.Transport; - -/** - * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub - * injector, and cancels the streaming and the injector pipelines once the program terminates. - * - * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes. - */ -public class DataflowExampleUtils { - - private final DataflowPipelineOptions options; - private Bigquery bigQueryClient = null; - private Pubsub pubsubClient = null; - private Dataflow dataflowClient = null; - private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet(); - private List<String> pendingMessages = Lists.newArrayList(); - - /** - * Define an interface that supports the PubSub and BigQuery example options. - */ - public static interface DataflowExampleUtilsOptions - extends DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions { - } - - public DataflowExampleUtils(DataflowPipelineOptions options) { - this.options = options; - } - - /** - * Do resources and runner options setup. - */ - public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) - throws IOException { - this.options = options; - setupResourcesAndRunner(isUnbounded); - } - - /** - * Sets up external resources that are required by the example, - * such as Pub/Sub topics and BigQuery tables. - * - * @throws IOException if there is a problem setting up the resources - */ - public void setup() throws IOException { - setupPubsubTopic(); - setupBigQueryTable(); - } - - /** - * Set up external resources, and configure the runner appropriately. - */ - public void setupResourcesAndRunner(boolean isUnbounded) throws IOException { - if (isUnbounded) { - options.setStreaming(true); - } - setup(); - setupRunner(); - } - - /** - * Sets up the Google Cloud Pub/Sub topic. - * - * <p>If the topic doesn't exist, a new topic with the given name will be created. - * - * @throws IOException if there is a problem setting up the Pub/Sub topic - */ - public void setupPubsubTopic() throws IOException { - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) { - pendingMessages.add("*******************Set Up Pubsub Topic*********************"); - setupPubsubTopic(pubsubTopicOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been set up for this example: " - + pubsubTopicOptions.getPubsubTopic()); - } - } - - /** - * Sets up the BigQuery table with the given schema. - * - * <p>If the table already exists, the schema has to match the given one. Otherwise, the example - * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema - * will be created. - * - * @throws IOException if there is a problem setting up the BigQuery table - */ - public void setupBigQueryTable() throws IOException { - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("******************Set Up Big Query Table*******************"); - setupBigQueryTable(bigQueryTableOptions.getProject(), - bigQueryTableOptions.getBigQueryDataset(), - bigQueryTableOptions.getBigQueryTable(), - bigQueryTableOptions.getBigQuerySchema()); - pendingMessages.add("The BigQuery table has been set up for this example: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - } - } - - /** - * Tears down external resources that can be deleted upon the example's completion. - */ - private void tearDown() { - pendingMessages.add("*************************Tear Down*************************"); - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) { - try { - deletePubsubTopic(pubsubTopicOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been deleted: " - + pubsubTopicOptions.getPubsubTopic()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub topic : " - + pubsubTopicOptions.getPubsubTopic()); - } - } - - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("The BigQuery table might contain the example's output, " - + "and it is not deleted automatically: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - pendingMessages.add("Please go to the Developers Console to delete it manually." - + " Otherwise, you may be charged for its usage."); - } - } - - private void setupBigQueryTable(String projectId, String datasetId, String tableId, - TableSchema schema) throws IOException { - if (bigQueryClient == null) { - bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); - } - - Datasets datasetService = bigQueryClient.datasets(); - if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { - Dataset newDataset = new Dataset().setDatasetReference( - new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); - datasetService.insert(projectId, newDataset).execute(); - } - - Tables tableService = bigQueryClient.tables(); - Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); - if (table == null) { - Table newTable = new Table().setSchema(schema).setTableReference( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); - tableService.insert(projectId, datasetId, newTable).execute(); - } else if (!table.getSchema().equals(schema)) { - throw new RuntimeException( - "Table exists and schemas do not match, expecting: " + schema.toPrettyString() - + ", actual: " + table.getSchema().toPrettyString()); - } - } - - private void setupPubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { - pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub topic. - * - * @throws IOException if there is a problem deleting the Pub/Sub topic - */ - private void deletePubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { - pubsubClient.projects().topics().delete(topic).execute(); - } - } - - /** - * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined, - * start an 'injector' pipeline that publishes the contents of the file to the given topic, first - * creating the topic if necessary. - */ - public void startInjectorIfNeeded(String inputFile) { - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (pubsubTopicOptions.isStreaming() - && inputFile != null && !inputFile.isEmpty() - && pubsubTopicOptions.getPubsubTopic() != null - && !pubsubTopicOptions.getPubsubTopic().isEmpty()) { - runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic()); - } - } - - public void setupRunner() { - if (options.isStreaming() && options.getRunner().equals(BlockingDataflowRunner.class)) { - // In order to cancel the pipelines automatically, - // {@literal DataflowRunner} is forced to be used. - options.setRunner(DataflowRunner.class); - } - } - - /** - * Runs the batch injector for the streaming pipeline. - * - * <p>The injector pipeline will read from the given text file, and inject data - * into the Google Cloud Pub/Sub topic. - */ - public void runInjectorPipeline(String inputFile, String topic) { - DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class); - copiedOptions.setStreaming(false); - copiedOptions.setWorkerHarnessContainerImage( - DataflowRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE); - copiedOptions.setNumWorkers( - options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers()); - copiedOptions.setJobName(options.getJobName() + "-injector"); - Pipeline injectorPipeline = Pipeline.create(copiedOptions); - injectorPipeline.apply(TextIO.Read.from(inputFile)) - .apply(IntraBundleParallelization - .of(PubsubFileInjector.publish(topic)) - .withMaxParallelism(20)); - DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run(); - jobsToCancel.add(injectorJob); - } - - /** - * Runs the provided injector pipeline for the streaming pipeline. - */ - public void runInjectorPipeline(Pipeline injectorPipeline) { - DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run(); - jobsToCancel.add(injectorJob); - } - - /** - * Start the auxiliary injector pipeline, then wait for this pipeline to finish. - */ - public void mockUnboundedSource(String inputFile, PipelineResult result) { - startInjectorIfNeeded(inputFile); - waitToFinish(result); - } - - /** - * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, - * waits for the pipeline to finish and cancels it (and the injector) before the program exists. - */ - public void waitToFinish(PipelineResult result) { - if (result instanceof DataflowPipelineJob) { - final DataflowPipelineJob job = (DataflowPipelineJob) result; - jobsToCancel.add(job); - if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) { - addShutdownHook(jobsToCancel); - } - try { - job.waitUntilFinish(); - } catch (Exception e) { - throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId()); - } - } else { - // Do nothing if the given PipelineResult doesn't support waitUntilFinish(), - // such as EvaluationResults returned by DirectRunner. - } - } - - private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) { - if (dataflowClient == null) { - dataflowClient = options.getDataflowClient(); - } - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - tearDown(); - printPendingMessages(); - for (DataflowPipelineJob job : jobs) { - System.out.println("Canceling example pipeline: " + job.getJobId()); - try { - job.cancel(); - } catch (IOException e) { - System.out.println("Failed to cancel the job," - + " please go to the Developers Console to cancel it manually"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - - for (DataflowPipelineJob job : jobs) { - boolean cancellationVerified = false; - for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { - if (job.getState().isTerminal()) { - cancellationVerified = true; - System.out.println("Canceled example pipeline: " + job.getJobId()); - break; - } else { - System.out.println( - "The example pipeline is still running. Verifying the cancellation."); - } - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // Ignore - } - } - if (!cancellationVerified) { - System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); - System.out.println("Please go to the Developers Console to verify manually:"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - } - }); - } - - private void printPendingMessages() { - System.out.println(); - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - for (String message : pendingMessages) { - System.out.println(message); - } - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - } - - private static <T> T executeNullIfNotFound( - AbstractGoogleClientRequest<T> request) throws IOException { - try { - return request.execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) { - return null; - } else { - throw e; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java index 79fa865..96e8406 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java @@ -18,19 +18,19 @@ package ${package}.common; import com.google.api.services.bigquery.model.TableSchema; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; /** - * Options that can be used to configure BigQuery tables in Dataflow examples. + * Options that can be used to configure BigQuery tables in Beam examples. * The project defaults to the project being used to run the example. */ -public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions { +public interface ExampleBigQueryTableOptions extends GcpOptions { @Description("BigQuery dataset name") - @Default.String("dataflow_examples") + @Default.String("beam_examples") String getBigQueryDataset(); void setBigQueryDataset(String dataset); @@ -49,8 +49,7 @@ public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions { static class BigQueryTableFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { - return options.as(DataflowPipelineOptions.class).getJobName() - .replace('-', '_'); + return options.getJobName().replace('-', '_'); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java new file mode 100644 index 0000000..90f935c --- /dev/null +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ${package}.common; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Options that can be used to configure the Beam examples. + */ +public interface ExampleOptions extends PipelineOptions { + @Description("Whether to keep jobs running after local process exit") + @Default.Boolean(false) + boolean getKeepJobsRunning(); + void setKeepJobsRunning(boolean keepJobsRunning); + + @Description("Number of workers to use when executing the injector pipeline") + @Default.Integer(1) + int getInjectorNumWorkers(); + void setInjectorNumWorkers(int numWorkers); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java new file mode 100644 index 0000000..e3fb132 --- /dev/null +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicAndSubscriptionOptions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ${package}.common; + +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Options that can be used to configure Pub/Sub topic/subscription in Beam examples. + */ +public interface ExamplePubsubTopicAndSubscriptionOptions extends ExamplePubsubTopicOptions { + @Description("Pub/Sub subscription") + @Default.InstanceFactory(PubsubSubscriptionFactory.class) + String getPubsubSubscription(); + void setPubsubSubscription(String subscription); + + /** + * Returns a default Pub/Sub subscription based on the project and the job names. + */ + static class PubsubSubscriptionFactory implements DefaultValueFactory<String> { + @Override + public String create(PipelineOptions options) { + return "projects/" + options.as(GcpOptions.class).getProject() + + "/subscriptions/" + options.getJobName(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java index 8a7c9cf..1825267 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java @@ -17,36 +17,29 @@ */ package ${package}.common; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; /** - * Options that can be used to configure Pub/Sub topic in Dataflow examples. + * Options that can be used to configure Pub/Sub topic in Beam examples. */ -public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions { +public interface ExamplePubsubTopicOptions extends GcpOptions { @Description("Pub/Sub topic") @Default.InstanceFactory(PubsubTopicFactory.class) String getPubsubTopic(); void setPubsubTopic(String topic); - @Description("Number of workers to use when executing the injector pipeline") - @Default.Integer(1) - int getInjectorNumWorkers(); - void setInjectorNumWorkers(int numWorkers); - /** * Returns a default Pub/Sub topic based on the project and the job names. */ static class PubsubTopicFactory implements DefaultValueFactory<String> { @Override public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowPipelineOptions = - options.as(DataflowPipelineOptions.class); - return "projects/" + dataflowPipelineOptions.getProject() - + "/topics/" + dataflowPipelineOptions.getJobName(); + return "projects/" + options.as(GcpOptions.class).getProject() + + "/topics/" + options.getJobName(); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java new file mode 100644 index 0000000..afef188 --- /dev/null +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleUtils.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ${package}.common; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.Bigquery.Datasets; +import com.google.api.services.bigquery.Bigquery.Tables; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.pubsub.Pubsub; +import com.google.api.services.pubsub.model.Subscription; +import com.google.api.services.pubsub.model.Topic; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Transport; +import org.joda.time.Duration; + +/** + * The utility class that sets up and tears down external resources, + * and cancels the streaming pipelines once the program terminates. + * + * <p>It is used to run Beam examples, such as TrafficMaxLaneFlow and TrafficRoutes. + */ +public class ExampleUtils { + + private static final int SC_NOT_FOUND = 404; + + private final PipelineOptions options; + private Bigquery bigQueryClient = null; + private Pubsub pubsubClient = null; + private Set<PipelineResult> pipelinesToCancel = Sets.newHashSet(); + private List<String> pendingMessages = Lists.newArrayList(); + + /** + * Do resources and runner options setup. + */ + public ExampleUtils(PipelineOptions options) { + this.options = options; + } + + /** + * Sets up external resources that are required by the example, + * such as Pub/Sub topics and BigQuery tables. + * + * @throws IOException if there is a problem setting up the resources + */ + public void setup() throws IOException { + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backOff = + FluentBackoff.DEFAULT + .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff(); + Throwable lastException = null; + try { + do { + try { + setupPubsub(); + setupBigQueryTable(); + return; + } catch (GoogleJsonResponseException e) { + lastException = e; + } + } while (BackOffUtils.next(sleeper, backOff)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // Ignore InterruptedException + } + throw new RuntimeException(lastException); + } + + /** + * Sets up the Google Cloud Pub/Sub topic. + * + * <p>If the topic doesn't exist, a new topic with the given name will be created. + * + * @throws IOException if there is a problem setting up the Pub/Sub topic + */ + public void setupPubsub() throws IOException { + ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = + options.as(ExamplePubsubTopicAndSubscriptionOptions.class); + if (!pubsubOptions.getPubsubTopic().isEmpty()) { + pendingMessages.add("**********************Set Up Pubsub************************"); + setupPubsubTopic(pubsubOptions.getPubsubTopic()); + pendingMessages.add("The Pub/Sub topic has been set up for this example: " + + pubsubOptions.getPubsubTopic()); + + if (!pubsubOptions.getPubsubSubscription().isEmpty()) { + setupPubsubSubscription( + pubsubOptions.getPubsubTopic(), pubsubOptions.getPubsubSubscription()); + pendingMessages.add("The Pub/Sub subscription has been set up for this example: " + + pubsubOptions.getPubsubSubscription()); + } + } + } + + /** + * Sets up the BigQuery table with the given schema. + * + * <p>If the table already exists, the schema has to match the given one. Otherwise, the example + * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema + * will be created. + * + * @throws IOException if there is a problem setting up the BigQuery table + */ + public void setupBigQueryTable() throws IOException { + ExampleBigQueryTableOptions bigQueryTableOptions = + options.as(ExampleBigQueryTableOptions.class); + if (bigQueryTableOptions.getBigQueryDataset() != null + && bigQueryTableOptions.getBigQueryTable() != null + && bigQueryTableOptions.getBigQuerySchema() != null) { + pendingMessages.add("******************Set Up Big Query Table*******************"); + setupBigQueryTable(bigQueryTableOptions.getProject(), + bigQueryTableOptions.getBigQueryDataset(), + bigQueryTableOptions.getBigQueryTable(), + bigQueryTableOptions.getBigQuerySchema()); + pendingMessages.add("The BigQuery table has been set up for this example: " + + bigQueryTableOptions.getProject() + + ":" + bigQueryTableOptions.getBigQueryDataset() + + "." + bigQueryTableOptions.getBigQueryTable()); + } + } + + /** + * Tears down external resources that can be deleted upon the example's completion. + */ + private void tearDown() { + pendingMessages.add("*************************Tear Down*************************"); + ExamplePubsubTopicAndSubscriptionOptions pubsubOptions = + options.as(ExamplePubsubTopicAndSubscriptionOptions.class); + if (!pubsubOptions.getPubsubTopic().isEmpty()) { + try { + deletePubsubTopic(pubsubOptions.getPubsubTopic()); + pendingMessages.add("The Pub/Sub topic has been deleted: " + + pubsubOptions.getPubsubTopic()); + } catch (IOException e) { + pendingMessages.add("Failed to delete the Pub/Sub topic : " + + pubsubOptions.getPubsubTopic()); + } + if (!pubsubOptions.getPubsubSubscription().isEmpty()) { + try { + deletePubsubSubscription(pubsubOptions.getPubsubSubscription()); + pendingMessages.add("The Pub/Sub subscription has been deleted: " + + pubsubOptions.getPubsubSubscription()); + } catch (IOException e) { + pendingMessages.add("Failed to delete the Pub/Sub subscription : " + + pubsubOptions.getPubsubSubscription()); + } + } + } + + ExampleBigQueryTableOptions bigQueryTableOptions = + options.as(ExampleBigQueryTableOptions.class); + if (bigQueryTableOptions.getBigQueryDataset() != null + && bigQueryTableOptions.getBigQueryTable() != null + && bigQueryTableOptions.getBigQuerySchema() != null) { + pendingMessages.add("The BigQuery table might contain the example's output, " + + "and it is not deleted automatically: " + + bigQueryTableOptions.getProject() + + ":" + bigQueryTableOptions.getBigQueryDataset() + + "." + bigQueryTableOptions.getBigQueryTable()); + pendingMessages.add("Please go to the Developers Console to delete it manually." + + " Otherwise, you may be charged for its usage."); + } + } + + private void setupBigQueryTable(String projectId, String datasetId, String tableId, + TableSchema schema) throws IOException { + if (bigQueryClient == null) { + bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); + } + + Datasets datasetService = bigQueryClient.datasets(); + if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { + Dataset newDataset = new Dataset().setDatasetReference( + new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); + datasetService.insert(projectId, newDataset).execute(); + } + + Tables tableService = bigQueryClient.tables(); + Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); + if (table == null) { + Table newTable = new Table().setSchema(schema).setTableReference( + new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); + tableService.insert(projectId, datasetId, newTable).execute(); + } else if (!table.getSchema().equals(schema)) { + throw new RuntimeException( + "Table exists and schemas do not match, expecting: " + schema.toPrettyString() + + ", actual: " + table.getSchema().toPrettyString()); + } + } + + private void setupPubsubTopic(String topic) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { + pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); + } + } + + private void setupPubsubSubscription(String topic, String subscription) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) == null) { + Subscription subInfo = new Subscription() + .setAckDeadlineSeconds(60) + .setTopic(topic); + pubsubClient.projects().subscriptions().create(subscription, subInfo).execute(); + } + } + + /** + * Deletes the Google Cloud Pub/Sub topic. + * + * @throws IOException if there is a problem deleting the Pub/Sub topic + */ + private void deletePubsubTopic(String topic) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { + pubsubClient.projects().topics().delete(topic).execute(); + } + } + + /** + * Deletes the Google Cloud Pub/Sub subscription. + * + * @throws IOException if there is a problem deleting the Pub/Sub subscription + */ + private void deletePubsubSubscription(String subscription) throws IOException { + if (pubsubClient == null) { + pubsubClient = Transport.newPubsubClient(options.as(PubsubOptions.class)).build(); + } + if (executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription)) != null) { + pubsubClient.projects().subscriptions().delete(subscription).execute(); + } + } + + /** + * If {@literal DataflowRunner} or {@literal BlockingDataflowRunner} is used, + * waits for the pipeline to finish and cancels it (and the injector) before the program exists. + */ + public void waitToFinish(PipelineResult result) { + pipelinesToCancel.add(result); + if (!options.as(ExampleOptions.class).getKeepJobsRunning()) { + addShutdownHook(pipelinesToCancel); + } + try { + result.waitUntilFinish(); + } catch (UnsupportedOperationException e) { + // Do nothing if the given PipelineResult doesn't support waitUntilFinish(), + // such as EvaluationResults returned by DirectRunner. + tearDown(); + printPendingMessages(); + } catch (Exception e) { + throw new RuntimeException("Failed to wait the pipeline until finish: " + result); + } + } + + private void addShutdownHook(final Collection<PipelineResult> pipelineResults) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + tearDown(); + printPendingMessages(); + for (PipelineResult pipelineResult : pipelineResults) { + try { + pipelineResult.cancel(); + } catch (IOException e) { + System.out.println("Failed to cancel the job."); + System.out.println(e.getMessage()); + } + } + + for (PipelineResult pipelineResult : pipelineResults) { + boolean cancellationVerified = false; + for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { + if (pipelineResult.getState().isTerminal()) { + cancellationVerified = true; + break; + } else { + System.out.println( + "The example pipeline is still running. Verifying the cancellation."); + } + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); + } + if (!cancellationVerified) { + System.out.println("Failed to verify the cancellation for job: " + pipelineResult); + } + } + } + }); + } + + private void printPendingMessages() { + System.out.println(); + System.out.println("***********************************************************"); + System.out.println("***********************************************************"); + for (String message : pendingMessages) { + System.out.println(message); + } + System.out.println("***********************************************************"); + System.out.println("***********************************************************"); + } + + private static <T> T executeNullIfNotFound( + AbstractGoogleClientRequest<T> request) throws IOException { + try { + return request.execute(); + } catch (GoogleJsonResponseException e) { + if (e.getStatusCode() == SC_NOT_FOUND) { + return null; + } else { + throw e; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java index 58e0821..6ca20f3 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java @@ -23,15 +23,15 @@ import com.google.api.services.pubsub.model.PubsubMessage; import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.Arrays; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.IntraBundleParallelization; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.Transport; /** @@ -69,7 +69,7 @@ public class PubsubFileInjector { } } - /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */ + /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */ public static class Bound extends OldDoFn<String, Void> { private final String outputTopic; private final String timestampLabelKey; @@ -83,7 +83,7 @@ public class PubsubFileInjector { @Override public void startBundle(Context context) { this.pubsub = - Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class)) + Transport.newPubsubClient(context.getPipelineOptions().as(PubsubOptions.class)) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9a66e4b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java index 875d3d7..83d0f37 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java @@ -20,6 +20,7 @@ package ${package}; import ${package}.WordCount.CountWords; import ${package}.WordCount.ExtractWordsFn; import ${package}.WordCount.FormatAsTextFn; + import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; @@ -28,8 +29,9 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -38,14 +40,13 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; - /** * Tests of WordCount. */ @RunWith(JUnit4.class) public class WordCountTest { - /** Example test that tests a specific DoFn. */ + /** Example test that tests a specific {@link DoFn}. */ @Test public void testExtractWordsFn() throws Exception { DoFnTester<String, String> extractWordsFn = @@ -77,7 +78,7 @@ public class WordCountTest { PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); PCollection<String> output = input.apply(new CountWords()) - .apply(ParDo.of(new FormatAsTextFn())); + .apply(MapElements.via(new FormatAsTextFn())); PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); p.run();
