Directory reorganization Move Java-specific archetypes from "maven-archetypes/" into "sdks/java/maven-archetypes/".
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/257a7a6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/257a7a6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/257a7a6b Branch: refs/heads/master Commit: 257a7a6be0cce4d08be749af159ec8a6adb7ceb9 Parents: d4233aa Author: Davor Bonaci <[email protected]> Authored: Wed Mar 23 17:47:11 2016 -0700 Committer: Davor Bonaci <[email protected]> Committed: Wed Mar 23 18:33:33 2016 -0700 ---------------------------------------------------------------------- maven-archetypes/examples/pom.xml | 56 --- .../META-INF/maven/archetype-metadata.xml | 29 -- .../main/resources/archetype-resources/pom.xml | 204 ---------- .../src/main/java/DebuggingWordCount.java | 182 --------- .../src/main/java/MinimalWordCount.java | 115 ------ .../src/main/java/WindowedWordCount.java | 262 ------------ .../src/main/java/WordCount.java | 204 ---------- .../java/common/DataflowExampleOptions.java | 29 -- .../main/java/common/DataflowExampleUtils.java | 398 ------------------- .../common/ExampleBigQueryTableOptions.java | 53 --- .../java/common/ExamplePubsubTopicOptions.java | 49 --- .../main/java/common/PubsubFileInjector.java | 153 ------- .../src/test/java/DebuggingWordCountTest.java | 44 -- .../src/test/java/WordCountTest.java | 85 ---- .../projects/basic/archetype.properties | 5 - .../src/test/resources/projects/basic/goal.txt | 1 - maven-archetypes/pom.xml | 41 -- maven-archetypes/starter/pom.xml | 57 --- .../META-INF/maven/archetype-metadata.xml | 21 - .../main/resources/archetype-resources/pom.xml | 43 -- .../src/main/java/StarterPipeline.java | 67 ---- .../projects/basic/archetype.properties | 5 - .../src/test/resources/projects/basic/goal.txt | 1 - .../resources/projects/basic/reference/pom.xml | 43 -- .../src/main/java/it/pkg/StarterPipeline.java | 67 ---- pom.xml | 2 +- sdks/java/maven-archetypes/examples/pom.xml | 56 +++ .../META-INF/maven/archetype-metadata.xml | 29 ++ .../main/resources/archetype-resources/pom.xml | 204 ++++++++++ .../src/main/java/DebuggingWordCount.java | 182 +++++++++ .../src/main/java/MinimalWordCount.java | 115 ++++++ .../src/main/java/WindowedWordCount.java | 262 ++++++++++++ .../src/main/java/WordCount.java | 204 ++++++++++ .../java/common/DataflowExampleOptions.java | 29 ++ .../main/java/common/DataflowExampleUtils.java | 398 +++++++++++++++++++ .../common/ExampleBigQueryTableOptions.java | 53 +++ .../java/common/ExamplePubsubTopicOptions.java | 49 +++ .../main/java/common/PubsubFileInjector.java | 153 +++++++ .../src/test/java/DebuggingWordCountTest.java | 44 ++ .../src/test/java/WordCountTest.java | 85 ++++ .../projects/basic/archetype.properties | 5 + .../src/test/resources/projects/basic/goal.txt | 1 + sdks/java/maven-archetypes/pom.xml | 41 ++ sdks/java/maven-archetypes/starter/pom.xml | 57 +++ .../META-INF/maven/archetype-metadata.xml | 21 + .../main/resources/archetype-resources/pom.xml | 43 ++ .../src/main/java/StarterPipeline.java | 67 ++++ .../projects/basic/archetype.properties | 5 + .../src/test/resources/projects/basic/goal.txt | 1 + .../resources/projects/basic/reference/pom.xml | 43 ++ .../src/main/java/it/pkg/StarterPipeline.java | 67 ++++ 51 files changed, 2215 insertions(+), 2215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/pom.xml ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/pom.xml b/maven-archetypes/examples/pom.xml deleted file mode 100644 index 7e74b9d..0000000 --- a/maven-archetypes/examples/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>maven-archetypes-parent</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>maven-archetypes-examples</artifactId> - <name>Apache Beam :: Maven Archetypes :: Examples</name> - <description>A Maven Archetype to create a project containing all the - example pipelines from the Apache Beam Java SDK.</description> - - <packaging>maven-archetype</packaging> - - <build> - <extensions> - <extension> - <groupId>org.apache.maven.archetype</groupId> - <artifactId>archetype-packaging</artifactId> - <version>2.4</version> - </extension> - </extensions> - - <pluginManagement> - <plugins> - <plugin> - <artifactId>maven-archetype-plugin</artifactId> - <version>2.4</version> - </plugin> - </plugins> - </pluginManagement> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml b/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml deleted file mode 100644 index 7742af4..0000000 --- a/maven-archetypes/examples/src/main/resources/META-INF/maven/archetype-metadata.xml +++ /dev/null @@ -1,29 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<archetype-descriptor - xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd" - name="Google Cloud Dataflow Example Pipelines Archetype" - xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - - <requiredProperties> - <requiredProperty key="targetPlatform"> - <defaultValue>1.7</defaultValue> - </requiredProperty> - </requiredProperties> - - <fileSets> - <fileSet filtered="true" packaged="true" encoding="UTF-8"> - <directory>src/main/java</directory> - <includes> - <include>**/*.java</include> - </includes> - </fileSet> - - <fileSet filtered="true" packaged="true" encoding="UTF-8"> - <directory>src/test/java</directory> - <includes> - <include>**/*.java</include> - </includes> - </fileSet> - </fileSets> -</archetype-descriptor> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml deleted file mode 100644 index d19d0c6..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ /dev/null @@ -1,204 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - ~ Copyright (C) 2015 Google Inc. - ~ - ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not - ~ use this file except in compliance with the License. You may obtain a copy of - ~ the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - ~ License for the specific language governing permissions and limitations under - ~ the License. - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>${groupId}</groupId> - <artifactId>${artifactId}</artifactId> - <version>${version}</version> - - <packaging>jar</packaging> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <source>${targetPlatform}</source> - <target>${targetPlatform}</target> - </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.3</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-bundled-${project.version}</finalName> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.18.1</version> - <configuration> - <parallel>all</parallel> - <threadCount>4</threadCount> - <redirectTestOutputToFile>true</redirectTestOutputToFile> - </configuration> - <dependencies> - <dependency> - <groupId>org.apache.maven.surefire</groupId> - <artifactId>surefire-junit47</artifactId> - <version>2.18.1</version> - </dependency> - </dependencies> - </plugin> - </plugins> - </build> - - <dependencies> - <!-- Adds a dependency on a specific version of the Dataflow SDK. --> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>java-sdk-all</artifactId> - <version>[0-incubating, 2-incubating)</version> - </dependency> - - <dependency> - <groupId>com.google.api-client</groupId> - <artifactId>google-api-client</artifactId> - <version>1.21.0</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- Dependencies below this line are specific dependencies needed by the examples code. --> - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - <version>v2-rev248-1.21.0</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.http-client</groupId> - <artifactId>google-http-client</artifactId> - <version>1.21.0</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-pubsub</artifactId> - <version>v1-rev7-1.21.0</version> - <exclusions> - <!-- Exclude an old version of guava that is being pulled - in by a transitive dependency of google-api-client --> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava-jdk5</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - <version>2.4</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>18.0</version> - </dependency> - - <dependency> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> - <version>3.1.0</version> - </dependency> - - <!-- Add slf4j API frontend binding with JUL backend --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>1.7.7</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - <version>1.7.7</version> - <!-- When loaded at runtime this will wire up slf4j to the JUL backend --> - <scope>runtime</scope> - </dependency> - - <!-- Hamcrest and JUnit are required dependencies of DataflowAssert, - which is used in the main code of DebuggingWordCount example. --> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java deleted file mode 100644 index 3cf2bc0..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}; - -import ${package}.WordCount.WordCountOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.regex.Pattern; - - -/** - * An example that verifies word counts in Shakespeare and includes Dataflow best practices. - * - * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more - * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount} - * and {@link WordCount}. After you've looked at this example, then see the - * {@link WindowedWordCount} pipeline, for introduction of additional concepts. - * - * <p>Basic concepts, also in the MinimalWordCount and WordCount examples: - * Reading text files; counting a PCollection; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns. - * - * <p>New Concepts: - * <pre> - * 1. Logging to Cloud Logging - * 2. Controlling Dataflow worker log levels - * 3. Creating a custom aggregator - * 4. Testing your Pipeline via DataflowAssert - * </pre> - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * - * <p>To execute this pipeline using the Dataflow service and the additional logging discussed - * below, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"} - * } - * </pre> - * - * <p>Note that when you run via <code>mvn exec</code>, you may need to escape - * the quotations as appropriate for your shell. For example, in <code>bash</code>: - * <pre> - * mvn compile exec:java ... \ - * -Dexec.args="... \ - * --workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}" - * </pre> - * - * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - * <pre><code> - * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...} - * </code></pre> - * For example, by specifying: - * <pre><code> - * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"} - * </code></pre> - * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code com.google.cloud.dataflow.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. - */ -public class DebuggingWordCount { - /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { - /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn - * as the logger. All log statements emitted by this logger will be referenced by this name - * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging - * about the Cloud Logging UI. - */ - private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); - - private final Pattern filter; - public FilterTextFn(String pattern) { - filter = Pattern.compile(pattern); - } - - /** - * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those - * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the - * Dataflow service. These aggregators below track the number of matched and unmatched words. - * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about - * the Dataflow Monitoring UI. - */ - private final Aggregator<Long, Long> matchedWords = - createAggregator("matchedWords", new Sum.SumLongFn()); - private final Aggregator<Long, Long> unmatchedWords = - createAggregator("umatchedWords", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (filter.matcher(c.element().getKey()).matches()) { - // Log at the "DEBUG" level each element that we match. When executing this pipeline - // using the Dataflow service, these log lines will appear in the Cloud Logging UI - // only if the log level is set to "DEBUG" or lower. - LOG.debug("Matched: " + c.element().getKey()); - matchedWords.addValue(1L); - c.output(c.element()); - } else { - // Log at the "TRACE" level each element that is not matched. Different log levels - // can be used to control the verbosity of logging providing an effective mechanism - // to filter less important information. - LOG.trace("Did not match: " + c.element().getKey()); - unmatchedWords.addValue(1L); - } - } - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - PCollection<KV<String, Long>> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) - .apply(new WordCount.CountWords()) - .apply(ParDo.of(new FilterTextFn("Flourish|stomach"))); - - /** - * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of - * Hamcrest's collection matchers that can be used when writing Pipeline level tests - * to validate the contents of PCollections. DataflowAssert is best used in unit tests - * with small data sets but is demonstrated here as a teaching tool. - * - * <p>Below we verify that the set of filtered words matches our expected counts. Note - * that DataflowAssert does not provide any output and that successful completion of the - * Pipeline implies that the expectations were met. Learn more at - * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test - * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test. - */ - List<KV<String, Long>> expectedResults = Arrays.asList( - KV.of("Flourish", 3L), - KV.of("stomach", 1L)); - DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java deleted file mode 100644 index 035db01..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.KV; - - -/** - * An example that counts words in Shakespeare. - * - * <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more - * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or - * argument processing, and focus on construction of the pipeline, which chains together the - * application of core transforms. - * - * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally - * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional - * concepts. - * - * <p>Concepts: - * <pre> - * 1. Reading data from text files - * 2. Specifying 'inline' transforms - * 3. Counting a PCollection - * 4. Writing data to Cloud Storage as text files - * </pre> - * - * <p>To execute this pipeline, first edit the code to set your project ID, the staging - * location, and the output location. The specified GCS bucket(s) must already exist. - * - * <p>Then, run the pipeline as described in the README. It will be deployed and run using the - * Dataflow service. No args are required to run the pipeline. You can see the results in your - * output bucket in the GCS browser. - */ -public class MinimalWordCount { - - public static void main(String[] args) { - // Create a DataflowPipelineOptions object. This object lets us set various execution - // options for our pipeline, such as the associated Cloud Platform project and the location - // in Google Cloud Storage to stage files. - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowPipelineRunner.class); - // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); - - // Create the Pipeline object with the options we defined above. - Pipeline p = Pipeline.create(options); - - // Apply the pipeline's transforms. - - // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set - // of input text files. TextIO.Read returns a PCollection where each element is one line from - // the input text (a set of Shakespeare's texts). - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) - // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a - // DoFn (defined in-line) on each element that tokenizes the text line into individual words. - // The ParDo returns a PCollection<String>, where each element is an individual word in - // Shakespeare's collected texts. - .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - })) - // Concept #3: Apply the Count transform to our PCollection of individual words. The Count - // transform returns a new PCollection of key/value pairs, where each key represents a unique - // word in the text. The associated value is the occurrence count for that word. - .apply(Count.<String>perElement()) - // Apply another ParDo transform that formats our PCollection of word counts into a printable - // string, suitable for writing to an output file. - .apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ": " + c.element().getValue()); - } - })) - // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. - // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of - // formatted strings) to a series of text files in Google Cloud Storage. - // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); - - // Run the pipeline. - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java deleted file mode 100644 index 29921e2..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import ${package}.common.DataflowExampleUtils; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - - -/** - * An example that counts words in text, and can run over either unbounded or bounded input - * collections. - * - * <p>This class, {@link WindowedWordCount}, is the last in a series of four successively more - * detailed 'word count' examples. First take a look at {@link MinimalWordCount}, - * {@link WordCount}, and {@link DebuggingWordCount}. - * - * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples: - * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns; creating a custom aggregator; - * user-defined PTransforms; defining PipelineOptions. - * - * <p>New Concepts: - * <pre> - * 1. Unbounded and bounded pipeline input modes - * 2. Adding timestamps to data - * 3. PubSub topics as sources - * 4. Windowing - * 5. Re-using PTransforms over windowed PCollections - * 6. Writing to BigQuery - * </pre> - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * } - * </pre> - * - * <p>Optionally specify the input file path via: - * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}. - * - * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't - * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code dataflow-examples} must already exist in your project. - * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. - * - * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or - * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set - * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from - * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not - * exist, the pipeline will create one for you. It will delete this topic when it terminates. - * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub - * topic with the contents of the {@code --inputFile}, in order to make the example easy to run. - * If you want to use an independently-populated PubSub topic, indicate this by setting - * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started. - * - * <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can - * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} - * for 10-minute windows. - */ -public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - static final int WINDOW_SIZE = 1; // Default window duration in minutes - - /** - * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for - * this example, for the bounded data case. - * - * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate - * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a - * 2-hour period. - */ - static class AddTimestampFn extends DoFn<String, String> { - private static final long RAND_RANGE = 7200000; // 2 hours in ms - - @Override - public void processElement(ProcessContext c) { - // Generate a timestamp that falls somewhere in the past two hours. - long randomTimestamp = System.currentTimeMillis() - - (int) (Math.random() * RAND_RANGE); - /** - * Concept #2: Set the data element with that timestamp. - */ - c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); - } - } - - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { - @Override - public void processElement(ProcessContext c) { - TableRow row = new TableRow() - .set("word", c.element().getKey()) - .set("count", c.element().getValue()) - // include a field for the window timestamp - .set("window_timestamp", c.timestamp().toString()); - c.output(row); - } - } - - /** - * Helper method that defines the BigQuery schema used for the output. - */ - private static TableSchema getSchema() { - List<TableFieldSchema> fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("word").setType("STRING")); - fields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - - /** - * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one - * that supports both bounded and unbounded data. This is a helper method that creates a - * TableReference from input options, to tell the pipeline where to write its BigQuery results. - */ - private static TableReference getTableReference(Options options) { - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - return tableRef; - } - - /** - * Options supported by {@link WindowedWordCount}. - * - * <p>Inherits standard example configuration options, which allow specification of the BigQuery - * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for - * specification of the input file. - */ - public static interface Options - extends WordCount.WordCountOptions, DataflowExampleUtils.DataflowExampleUtilsOptions { - @Description("Fixed window duration, in minutes") - @Default.Integer(WINDOW_SIZE) - Integer getWindowSize(); - void setWindowSize(Integer value); - - @Description("Whether to run the pipeline with unbounded input") - boolean isUnbounded(); - void setUnbounded(boolean value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setBigQuerySchema(getSchema()); - // DataflowExampleUtils creates the necessary input sources to simplify execution of this - // Pipeline. - DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options, - options.isUnbounded()); - - Pipeline pipeline = Pipeline.create(options); - - /** - * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or - * unbounded input source. - */ - PCollection<String> input; - if (options.isUnbounded()) { - LOG.info("Reading from PubSub."); - /** - * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't - * specified as an argument. The data elements' timestamps will come from the pubsub - * injection. - */ - input = pipeline - .apply(PubsubIO.Read.topic(options.getPubsubTopic())); - } else { - /** Else, this is a bounded pipeline. Read from the GCS file. */ - input = pipeline - .apply(TextIO.Read.from(options.getInputFile())) - // Concept #2: Add an element timestamp, using an artificial time just to show windowing. - // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); - } - - /** - * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1 - * minute (you can change this with a command-line option). See the documentation for more - * information on how fixed windows work, and for information on the other types of windowing - * available (e.g., sliding windows). - */ - PCollection<String> windowedWords = input - .apply(Window.<String>into( - FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); - - /** - * Concept #5: Re-use our existing CountWords transform that does not have knowledge of - * windows over a PCollection containing windowed values. - */ - PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords()); - - /** - * Concept #6: Format the results for a BigQuery table, then write to BigQuery. - * The BigQuery output source supports both bounded and unbounded data. - */ - wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write.to(getTableReference(options)).withSchema(getSchema())); - - PipelineResult result = pipeline.run(); - - /** - * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that - * runs for a limited time, and publishes to the input PubSub topic. - * - * With an unbounded input source, you will need to explicitly shut down this pipeline when you - * are done with it, so that you do not continue to be charged for the instances. You can do - * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow - * pipelines. The PubSub topic will also be deleted at this time. - */ - exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java deleted file mode 100644 index 150b60d..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - - -/** - * An example that counts words in Shakespeare and includes Dataflow best practices. - * - * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed - * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. - * After you've looked at this example, then see the {@link DebuggingWordCount} - * pipeline, for introduction of additional concepts. - * - * <p>For a detailed walkthrough of this example, see - * <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example"> - * https://cloud.google.com/dataflow/java-sdk/wordcount-example - * </a> - * - * <p>Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to GCS. - * - * <p>New Concepts: - * <pre> - * 1. Executing a Pipeline both locally and using the Dataflow service - * 2. Using ParDo with static DoFns defined out-of-line - * 3. Building a composite transform - * 4. Defining your own pipeline options - * </pre> - * - * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service. - * These are now command-line options and not hard-coded as they were in the MinimalWordCount - * example. - * To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * and a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * } - * </pre> - * and an output prefix on GCS: - * <pre>{@code - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> - * - * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. - */ -public class WordCount { - - /** - * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. - */ - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** A DoFn that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey() + ": " + c.element().getValue()); - } - } - - /** - * A PTransform that converts a PCollection containing lines of text into a PCollection of - * formatted word counts. - * - * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and - * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, - * modular testing, and an improved monitoring experience. - */ - public static class CountWords extends PTransform<PCollection<String>, - PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> apply(PCollection<String> lines) { - - // Convert lines of text into individual words. - PCollection<String> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - return wordCounts; - } - } - - /** - * Options supported by {@link WordCount}. - * - * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments - * to be processed by the command-line parser, and specify default values for them. You can then - * access the options values in your pipeline code. - * - * <p>Inherits standard configuration options. - */ - public static interface WordCountOptions extends PipelineOptions { - @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInputFile(); - void setInputFile(String value); - - @Description("Path of the file to write to") - @Default.InstanceFactory(OutputFactory.class) - String getOutput(); - void setOutput(String value); - - /** - * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination. - */ - public static class OutputFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - if (dataflowOptions.getStagingLocation() != null) { - return GcsPath.fromUri(dataflowOptions.getStagingLocation()) - .resolve("counts.txt").toString(); - } else { - throw new IllegalArgumentException("Must specify --output or --stagingLocation"); - } - } - } - - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the - // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) - .apply(new CountWords()) - .apply(ParDo.of(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java deleted file mode 100644 index e182f4c..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleOptions.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}.common; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; - -/** - * Options that can be used to configure the Dataflow examples. - */ -public interface DataflowExampleOptions extends DataflowPipelineOptions { - @Description("Whether to keep jobs running on the Dataflow service after local process exit") - @Default.Boolean(false) - boolean getKeepJobsRunning(); - void setKeepJobsRunning(boolean keepJobsRunning); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java deleted file mode 100644 index 9861769..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/DataflowExampleUtils.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}.common; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Datasets; -import com.google.api.services.bigquery.Bigquery.Tables; -import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.DatasetReference; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.Topic; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.BigQueryOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization; -import com.google.cloud.dataflow.sdk.util.MonitoringUtil; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import javax.servlet.http.HttpServletResponse; - -/** - * The utility class that sets up and tears down external resources, starts the Google Cloud Pub/Sub - * injector, and cancels the streaming and the injector pipelines once the program terminates. - * - * <p>It is used to run Dataflow examples, such as TrafficMaxLaneFlow and TrafficRoutes. - */ -public class DataflowExampleUtils { - - private final DataflowPipelineOptions options; - private Bigquery bigQueryClient = null; - private Pubsub pubsubClient = null; - private Dataflow dataflowClient = null; - private Set<DataflowPipelineJob> jobsToCancel = Sets.newHashSet(); - private List<String> pendingMessages = Lists.newArrayList(); - - /** - * Define an interface that supports the PubSub and BigQuery example options. - */ - public static interface DataflowExampleUtilsOptions - extends DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions { - } - - public DataflowExampleUtils(DataflowPipelineOptions options) { - this.options = options; - } - - /** - * Do resources and runner options setup. - */ - public DataflowExampleUtils(DataflowPipelineOptions options, boolean isUnbounded) - throws IOException { - this.options = options; - setupResourcesAndRunner(isUnbounded); - } - - /** - * Sets up external resources that are required by the example, - * such as Pub/Sub topics and BigQuery tables. - * - * @throws IOException if there is a problem setting up the resources - */ - public void setup() throws IOException { - setupPubsubTopic(); - setupBigQueryTable(); - } - - /** - * Set up external resources, and configure the runner appropriately. - */ - public void setupResourcesAndRunner(boolean isUnbounded) throws IOException { - if (isUnbounded) { - options.setStreaming(true); - } - setup(); - setupRunner(); - } - - /** - * Sets up the Google Cloud Pub/Sub topic. - * - * <p>If the topic doesn't exist, a new topic with the given name will be created. - * - * @throws IOException if there is a problem setting up the Pub/Sub topic - */ - public void setupPubsubTopic() throws IOException { - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) { - pendingMessages.add("*******************Set Up Pubsub Topic*********************"); - setupPubsubTopic(pubsubTopicOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been set up for this example: " - + pubsubTopicOptions.getPubsubTopic()); - } - } - - /** - * Sets up the BigQuery table with the given schema. - * - * <p>If the table already exists, the schema has to match the given one. Otherwise, the example - * will throw a RuntimeException. If the table doesn't exist, a new table with the given schema - * will be created. - * - * @throws IOException if there is a problem setting up the BigQuery table - */ - public void setupBigQueryTable() throws IOException { - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("******************Set Up Big Query Table*******************"); - setupBigQueryTable(bigQueryTableOptions.getProject(), - bigQueryTableOptions.getBigQueryDataset(), - bigQueryTableOptions.getBigQueryTable(), - bigQueryTableOptions.getBigQuerySchema()); - pendingMessages.add("The BigQuery table has been set up for this example: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - } - } - - /** - * Tears down external resources that can be deleted upon the example's completion. - */ - private void tearDown() { - pendingMessages.add("*************************Tear Down*************************"); - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (!pubsubTopicOptions.getPubsubTopic().isEmpty()) { - try { - deletePubsubTopic(pubsubTopicOptions.getPubsubTopic()); - pendingMessages.add("The Pub/Sub topic has been deleted: " - + pubsubTopicOptions.getPubsubTopic()); - } catch (IOException e) { - pendingMessages.add("Failed to delete the Pub/Sub topic : " - + pubsubTopicOptions.getPubsubTopic()); - } - } - - ExampleBigQueryTableOptions bigQueryTableOptions = - options.as(ExampleBigQueryTableOptions.class); - if (bigQueryTableOptions.getBigQueryDataset() != null - && bigQueryTableOptions.getBigQueryTable() != null - && bigQueryTableOptions.getBigQuerySchema() != null) { - pendingMessages.add("The BigQuery table might contain the example's output, " - + "and it is not deleted automatically: " - + bigQueryTableOptions.getProject() - + ":" + bigQueryTableOptions.getBigQueryDataset() - + "." + bigQueryTableOptions.getBigQueryTable()); - pendingMessages.add("Please go to the Developers Console to delete it manually." - + " Otherwise, you may be charged for its usage."); - } - } - - private void setupBigQueryTable(String projectId, String datasetId, String tableId, - TableSchema schema) throws IOException { - if (bigQueryClient == null) { - bigQueryClient = Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); - } - - Datasets datasetService = bigQueryClient.datasets(); - if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == null) { - Dataset newDataset = new Dataset().setDatasetReference( - new DatasetReference().setProjectId(projectId).setDatasetId(datasetId)); - datasetService.insert(projectId, newDataset).execute(); - } - - Tables tableService = bigQueryClient.tables(); - Table table = executeNullIfNotFound(tableService.get(projectId, datasetId, tableId)); - if (table == null) { - Table newTable = new Table().setSchema(schema).setTableReference( - new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId)); - tableService.insert(projectId, datasetId, newTable).execute(); - } else if (!table.getSchema().equals(schema)) { - throw new RuntimeException( - "Table exists and schemas do not match, expecting: " + schema.toPrettyString() - + ", actual: " + table.getSchema().toPrettyString()); - } - } - - private void setupPubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) == null) { - pubsubClient.projects().topics().create(topic, new Topic().setName(topic)).execute(); - } - } - - /** - * Deletes the Google Cloud Pub/Sub topic. - * - * @throws IOException if there is a problem deleting the Pub/Sub topic - */ - private void deletePubsubTopic(String topic) throws IOException { - if (pubsubClient == null) { - pubsubClient = Transport.newPubsubClient(options).build(); - } - if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) != null) { - pubsubClient.projects().topics().delete(topic).execute(); - } - } - - /** - * If this is an unbounded (streaming) pipeline, and both inputFile and pubsub topic are defined, - * start an 'injector' pipeline that publishes the contents of the file to the given topic, first - * creating the topic if necessary. - */ - public void startInjectorIfNeeded(String inputFile) { - ExamplePubsubTopicOptions pubsubTopicOptions = options.as(ExamplePubsubTopicOptions.class); - if (pubsubTopicOptions.isStreaming() - && inputFile != null && !inputFile.isEmpty() - && pubsubTopicOptions.getPubsubTopic() != null - && !pubsubTopicOptions.getPubsubTopic().isEmpty()) { - runInjectorPipeline(inputFile, pubsubTopicOptions.getPubsubTopic()); - } - } - - /** - * Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with - * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming - * flag value. - */ - public void setupRunner() { - if (options.isStreaming()) { - if (options.getRunner() == DirectPipelineRunner.class) { - throw new IllegalArgumentException( - "Processing of unbounded input sources is not supported with the DirectPipelineRunner."); - } - // In order to cancel the pipelines automatically, - // {@literal DataflowPipelineRunner} is forced to be used. - options.setRunner(DataflowPipelineRunner.class); - } - } - - /** - * Runs the batch injector for the streaming pipeline. - * - * <p>The injector pipeline will read from the given text file, and inject data - * into the Google Cloud Pub/Sub topic. - */ - public void runInjectorPipeline(String inputFile, String topic) { - DataflowPipelineOptions copiedOptions = options.cloneAs(DataflowPipelineOptions.class); - copiedOptions.setStreaming(false); - copiedOptions.setNumWorkers( - options.as(ExamplePubsubTopicOptions.class).getInjectorNumWorkers()); - copiedOptions.setJobName(options.getJobName() + "-injector"); - Pipeline injectorPipeline = Pipeline.create(copiedOptions); - injectorPipeline.apply(TextIO.Read.from(inputFile)) - .apply(IntraBundleParallelization - .of(PubsubFileInjector.publish(topic)) - .withMaxParallelism(20)); - DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run(); - jobsToCancel.add(injectorJob); - } - - /** - * Runs the provided injector pipeline for the streaming pipeline. - */ - public void runInjectorPipeline(Pipeline injectorPipeline) { - DataflowPipelineJob injectorJob = (DataflowPipelineJob) injectorPipeline.run(); - jobsToCancel.add(injectorJob); - } - - /** - * Start the auxiliary injector pipeline, then wait for this pipeline to finish. - */ - public void mockUnboundedSource(String inputFile, PipelineResult result) { - startInjectorIfNeeded(inputFile); - waitToFinish(result); - } - - /** - * If {@literal DataflowPipelineRunner} or {@literal BlockingDataflowPipelineRunner} is used, - * waits for the pipeline to finish and cancels it (and the injector) before the program exists. - */ - public void waitToFinish(PipelineResult result) { - if (result instanceof DataflowPipelineJob) { - final DataflowPipelineJob job = (DataflowPipelineJob) result; - jobsToCancel.add(job); - if (!options.as(DataflowExampleOptions.class).getKeepJobsRunning()) { - addShutdownHook(jobsToCancel); - } - try { - job.waitToFinish(-1, TimeUnit.SECONDS, new MonitoringUtil.PrintHandler(System.out)); - } catch (Exception e) { - throw new RuntimeException("Failed to wait for job to finish: " + job.getJobId()); - } - } else { - // Do nothing if the given PipelineResult doesn't support waitToFinish(), - // such as EvaluationResults returned by DirectPipelineRunner. - } - } - - private void addShutdownHook(final Collection<DataflowPipelineJob> jobs) { - if (dataflowClient == null) { - dataflowClient = options.getDataflowClient(); - } - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - tearDown(); - printPendingMessages(); - for (DataflowPipelineJob job : jobs) { - System.out.println("Canceling example pipeline: " + job.getJobId()); - try { - job.cancel(); - } catch (IOException e) { - System.out.println("Failed to cancel the job," - + " please go to the Developers Console to cancel it manually"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - - for (DataflowPipelineJob job : jobs) { - boolean cancellationVerified = false; - for (int retryAttempts = 6; retryAttempts > 0; retryAttempts--) { - if (job.getState().isTerminal()) { - cancellationVerified = true; - System.out.println("Canceled example pipeline: " + job.getJobId()); - break; - } else { - System.out.println( - "The example pipeline is still running. Verifying the cancellation."); - } - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - // Ignore - } - } - if (!cancellationVerified) { - System.out.println("Failed to verify the cancellation for job: " + job.getJobId()); - System.out.println("Please go to the Developers Console to verify manually:"); - System.out.println( - MonitoringUtil.getJobMonitoringPageURL(job.getProjectId(), job.getJobId())); - } - } - } - }); - } - - private void printPendingMessages() { - System.out.println(); - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - for (String message : pendingMessages) { - System.out.println(message); - } - System.out.println("***********************************************************"); - System.out.println("***********************************************************"); - } - - private static <T> T executeNullIfNotFound( - AbstractGoogleClientRequest<T> request) throws IOException { - try { - return request.execute(); - } catch (GoogleJsonResponseException e) { - if (e.getStatusCode() == HttpServletResponse.SC_NOT_FOUND) { - return null; - } else { - throw e; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java deleted file mode 100644 index bef5bfd..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExampleBigQueryTableOptions.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}.common; - -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure BigQuery tables in Dataflow examples. - * The project defaults to the project being used to run the example. - */ -public interface ExampleBigQueryTableOptions extends DataflowPipelineOptions { - @Description("BigQuery dataset name") - @Default.String("dataflow_examples") - String getBigQueryDataset(); - void setBigQueryDataset(String dataset); - - @Description("BigQuery table name") - @Default.InstanceFactory(BigQueryTableFactory.class) - String getBigQueryTable(); - void setBigQueryTable(String table); - - @Description("BigQuery table schema") - TableSchema getBigQuerySchema(); - void setBigQuerySchema(TableSchema schema); - - /** - * Returns the job name as the default BigQuery table name. - */ - static class BigQueryTableFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - return options.as(DataflowPipelineOptions.class).getJobName() - .replace('-', '_'); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java deleted file mode 100644 index 525de69..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/ExamplePubsubTopicOptions.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}.common; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; - -/** - * Options that can be used to configure Pub/Sub topic in Dataflow examples. - */ -public interface ExamplePubsubTopicOptions extends DataflowPipelineOptions { - @Description("Pub/Sub topic") - @Default.InstanceFactory(PubsubTopicFactory.class) - String getPubsubTopic(); - void setPubsubTopic(String topic); - - @Description("Number of workers to use when executing the injector pipeline") - @Default.Integer(1) - int getInjectorNumWorkers(); - void setInjectorNumWorkers(int numWorkers); - - /** - * Returns a default Pub/Sub topic based on the project and the job names. - */ - static class PubsubTopicFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowPipelineOptions = - options.as(DataflowPipelineOptions.class); - return "projects/" + dataflowPipelineOptions.getProject() - + "/topics/" + dataflowPipelineOptions.getJobName(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java deleted file mode 100644 index f6f80ae..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}.common; - -import com.google.api.services.pubsub.Pubsub; -import com.google.api.services.pubsub.model.PublishRequest; -import com.google.api.services.pubsub.model.PubsubMessage; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.IntraBundleParallelization; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.common.collect.ImmutableMap; - -import java.io.IOException; -import java.util.Arrays; - -/** - * A batch Dataflow pipeline for injecting a set of GCS files into - * a PubSub topic line by line. Empty lines are skipped. - * - * <p>This is useful for testing streaming - * pipelines. Note that since batch pipelines might retry chunks, this - * does _not_ guarantee exactly-once injection of file data. Some lines may - * be published multiple times. - * </p> - */ -public class PubsubFileInjector { - - /** - * An incomplete {@code PubsubFileInjector} transform with unbound output topic. - */ - public static class Unbound { - private final String timestampLabelKey; - - Unbound() { - this.timestampLabelKey = null; - } - - Unbound(String timestampLabelKey) { - this.timestampLabelKey = timestampLabelKey; - } - - Unbound withTimestampLabelKey(String timestampLabelKey) { - return new Unbound(timestampLabelKey); - } - - public Bound publish(String outputTopic) { - return new Bound(outputTopic, timestampLabelKey); - } - } - - /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */ - public static class Bound extends DoFn<String, Void> { - private final String outputTopic; - private final String timestampLabelKey; - public transient Pubsub pubsub; - - public Bound(String outputTopic, String timestampLabelKey) { - this.outputTopic = outputTopic; - this.timestampLabelKey = timestampLabelKey; - } - - @Override - public void startBundle(Context context) { - this.pubsub = - Transport.newPubsubClient(context.getPipelineOptions().as(DataflowPipelineOptions.class)) - .build(); - } - - @Override - public void processElement(ProcessContext c) throws IOException { - if (c.element().isEmpty()) { - return; - } - PubsubMessage pubsubMessage = new PubsubMessage(); - pubsubMessage.encodeData(c.element().getBytes()); - if (timestampLabelKey != null) { - pubsubMessage.setAttributes( - ImmutableMap.of(timestampLabelKey, Long.toString(c.timestamp().getMillis()))); - } - PublishRequest publishRequest = new PublishRequest(); - publishRequest.setMessages(Arrays.asList(pubsubMessage)); - this.pubsub.projects().topics().publish(outputTopic, publishRequest).execute(); - } - } - - /** - * Creates a {@code PubsubFileInjector} transform with the given timestamp label key. - */ - public static Unbound withTimestampLabelKey(String timestampLabelKey) { - return new Unbound(timestampLabelKey); - } - - /** - * Creates a {@code PubsubFileInjector} transform that publishes to the given output topic. - */ - public static Bound publish(String outputTopic) { - return new Unbound().publish(outputTopic); - } - - /** - * Command line parameter options. - */ - private interface PubsubFileInjectorOptions extends PipelineOptions { - @Description("GCS location of files.") - @Validation.Required - String getInput(); - void setInput(String value); - - @Description("Topic to publish on.") - @Validation.Required - String getOutputTopic(); - void setOutputTopic(String value); - } - - /** - * Sets up and starts streaming pipeline. - */ - public static void main(String[] args) { - PubsubFileInjectorOptions options = PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(PubsubFileInjectorOptions.class); - - Pipeline pipeline = Pipeline.create(options); - - pipeline - .apply(TextIO.Read.from(options.getInput())) - .apply(IntraBundleParallelization.of(PubsubFileInjector.publish(options.getOutputTopic())) - .withMaxParallelism(20)); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java deleted file mode 100644 index 7a9aa4c..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/DebuggingWordCountTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}; - -import com.google.common.io.Files; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; -import java.nio.charset.StandardCharsets; - -/** - * Tests for {@link DebuggingWordCount}. - */ -@RunWith(JUnit4.class) -public class DebuggingWordCountTest { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - - @Test - public void testDebuggingWordCount() throws Exception { - File file = tmpFolder.newFile(); - Files.write("stomach secret Flourish message Flourish here Flourish", file, - StandardCharsets.UTF_8); - DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()}); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java b/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java deleted file mode 100644 index 45555ce..0000000 --- a/maven-archetypes/examples/src/main/resources/archetype-resources/src/test/java/WordCountTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ${package}; - -import ${package}.WordCount.CountWords; -import ${package}.WordCount.ExtractWordsFn; -import ${package}.WordCount.FormatAsTextFn; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** - * Tests of WordCount. - */ -@RunWith(JUnit4.class) -public class WordCountTest { - - /** Example test that tests a specific DoFn. */ - @Test - public void testExtractWordsFn() { - DoFnTester<String, String> extractWordsFn = - DoFnTester.of(new ExtractWordsFn()); - - Assert.assertThat(extractWordsFn.processBatch(" some input words "), - CoreMatchers.hasItems("some", "input", "words")); - Assert.assertThat(extractWordsFn.processBatch(" "), - CoreMatchers.<String>hasItems()); - Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"), - CoreMatchers.hasItems("some", "input", "words")); - } - - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - - /** Example test that tests a PTransform by using an in-memory input and inspecting the output. */ - @Test - @Category(RunnableOnService.class) - public void testCountWords() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); - - PCollection<String> output = input.apply(new CountWords()) - .apply(ParDo.of(new FormatAsTextFn())); - - DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/test/resources/projects/basic/archetype.properties ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/test/resources/projects/basic/archetype.properties b/maven-archetypes/examples/src/test/resources/projects/basic/archetype.properties deleted file mode 100644 index c59e77a..0000000 --- a/maven-archetypes/examples/src/test/resources/projects/basic/archetype.properties +++ /dev/null @@ -1,5 +0,0 @@ -package=it.pkg -version=0.1-SNAPSHOT -groupId=archetype.it -artifactId=basic -targetPlatform=1.7 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/257a7a6b/maven-archetypes/examples/src/test/resources/projects/basic/goal.txt ---------------------------------------------------------------------- diff --git a/maven-archetypes/examples/src/test/resources/projects/basic/goal.txt b/maven-archetypes/examples/src/test/resources/projects/basic/goal.txt deleted file mode 100644 index 0b59873..0000000 --- a/maven-archetypes/examples/src/test/resources/projects/basic/goal.txt +++ /dev/null @@ -1 +0,0 @@ -verify
