[flink] restructure and cleanup Maven layout - move examples to separate Maven project - remove unused dependencies - simplify Maven configuration - introduce Scala suffixes to modules - unify indention
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/071e4dd6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/071e4dd6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/071e4dd6 Branch: refs/heads/master Commit: 071e4dd67021346b0cab2aafa0900ec7e34c4ef8 Parents: c0b9fc6 Author: Maximilian Michels <[email protected]> Authored: Tue Mar 15 16:03:14 2016 +0100 Committer: Maximilian Michels <[email protected]> Committed: Tue Mar 15 17:05:15 2016 +0100 ---------------------------------------------------------------------- runners/flink/README.md | 101 +-- runners/flink/examples/pom.xml | 89 +++ .../beam/runners/flink/examples/TFIDF.java | 452 ++++++++++++ .../beam/runners/flink/examples/WordCount.java | 113 +++ .../flink/examples/streaming/AutoComplete.java | 387 ++++++++++ .../flink/examples/streaming/JoinExamples.java | 158 ++++ .../KafkaWindowedWordCountExample.java | 143 ++++ .../examples/streaming/WindowedWordCount.java | 130 ++++ runners/flink/pom.xml | 416 +++++------ runners/flink/runner/pom.xml | 145 ++++ .../FlinkPipelineExecutionEnvironment.java | 269 +++++++ .../runners/flink/FlinkPipelineOptions.java | 93 +++ .../beam/runners/flink/FlinkPipelineRunner.java | 206 ++++++ .../beam/runners/flink/FlinkRunnerResult.java | 68 ++ .../apache/beam/runners/flink/io/ConsoleIO.java | 82 +++ .../FlinkBatchPipelineTranslator.java | 153 ++++ .../FlinkBatchTransformTranslators.java | 594 +++++++++++++++ .../FlinkBatchTranslationContext.java | 129 ++++ .../translation/FlinkPipelineTranslator.java | 36 + .../FlinkStreamingPipelineTranslator.java | 150 ++++ .../FlinkStreamingTransformTranslators.java | 406 +++++++++++ .../FlinkStreamingTranslationContext.java | 89 +++ .../FlinkCoGroupKeyedListAggregator.java | 60 ++ .../functions/FlinkCreateFunction.java | 62 ++ .../functions/FlinkDoFnFunction.java | 204 ++++++ .../FlinkKeyedListAggregationFunction.java | 77 ++ .../functions/FlinkMultiOutputDoFnFunction.java | 177 +++++ .../FlinkMultiOutputPruningFunction.java | 43 ++ .../functions/FlinkPartialReduceFunction.java | 60 ++ .../functions/FlinkReduceFunction.java | 57 ++ .../flink/translation/functions/UnionCoder.java | 150 ++++ .../translation/types/CoderComparator.java | 216 ++++++ .../translation/types/CoderTypeInformation.java | 116 +++ .../translation/types/CoderTypeSerializer.java | 152 ++++ .../types/InspectableByteArrayOutputStream.java | 34 + .../translation/types/KvCoderComperator.java | 264 +++++++ .../types/KvCoderTypeInformation.java | 186 +++++ .../types/VoidCoderTypeSerializer.java | 112 +++ .../wrappers/CombineFnAggregatorWrapper.java | 92 +++ .../wrappers/DataInputViewWrapper.java | 59 ++ .../wrappers/DataOutputViewWrapper.java | 52 ++ .../SerializableFnAggregatorWrapper.java | 91 +++ .../translation/wrappers/SinkOutputFormat.java | 121 ++++ .../translation/wrappers/SourceInputFormat.java | 164 +++++ .../translation/wrappers/SourceInputSplit.java | 52 ++ .../streaming/FlinkAbstractParDoWrapper.java | 266 +++++++ .../FlinkGroupAlsoByWindowWrapper.java | 631 ++++++++++++++++ .../streaming/FlinkGroupByKeyWrapper.java | 66 ++ .../streaming/FlinkParDoBoundMultiWrapper.java | 77 ++ .../streaming/FlinkParDoBoundWrapper.java | 100 +++ .../io/FlinkStreamingCreateFunction.java | 65 ++ .../streaming/io/UnboundedFlinkSource.java | 82 +++ .../streaming/io/UnboundedSocketSource.java | 233 ++++++ .../streaming/io/UnboundedSourceWrapper.java | 134 ++++ .../state/AbstractFlinkTimerInternals.java | 128 ++++ .../streaming/state/FlinkStateInternals.java | 715 +++++++++++++++++++ .../streaming/state/StateCheckpointReader.java | 91 +++ .../streaming/state/StateCheckpointUtils.java | 155 ++++ .../streaming/state/StateCheckpointWriter.java | 129 ++++ .../wrappers/streaming/state/StateType.java | 73 ++ .../runner/src/main/resources/log4j.properties | 23 + .../apache/beam/runners/flink/AvroITCase.java | 127 ++++ .../beam/runners/flink/FlattenizeITCase.java | 74 ++ .../beam/runners/flink/FlinkTestPipeline.java | 72 ++ .../beam/runners/flink/JoinExamplesITCase.java | 101 +++ .../runners/flink/MaybeEmptyTestITCase.java | 65 ++ .../runners/flink/ParDoMultiOutputITCase.java | 100 +++ .../beam/runners/flink/ReadSourceITCase.java | 165 +++++ .../flink/RemoveDuplicatesEmptyITCase.java | 70 ++ .../runners/flink/RemoveDuplicatesITCase.java | 71 ++ .../beam/runners/flink/SideInputITCase.java | 69 ++ .../apache/beam/runners/flink/TfIdfITCase.java | 78 ++ .../beam/runners/flink/WordCountITCase.java | 75 ++ .../runners/flink/WordCountJoin2ITCase.java | 138 ++++ .../runners/flink/WordCountJoin3ITCase.java | 156 ++++ .../beam/runners/flink/WriteSinkITCase.java | 158 ++++ .../flink/streaming/GroupAlsoByWindowTest.java | 508 +++++++++++++ .../flink/streaming/GroupByNullKeyTest.java | 123 ++++ .../flink/streaming/StateSerializationTest.java | 305 ++++++++ .../streaming/TopWikipediaSessionsITCase.java | 134 ++++ .../beam/runners/flink/util/JoinExamples.java | 160 +++++ .../src/test/resources/log4j-test.properties | 27 + .../FlinkPipelineExecutionEnvironment.java | 269 ------- .../runners/flink/FlinkPipelineOptions.java | 93 --- .../beam/runners/flink/FlinkPipelineRunner.java | 206 ------ .../beam/runners/flink/FlinkRunnerResult.java | 68 -- .../beam/runners/flink/examples/TFIDF.java | 452 ------------ .../beam/runners/flink/examples/WordCount.java | 113 --- .../flink/examples/streaming/AutoComplete.java | 387 ---------- .../flink/examples/streaming/JoinExamples.java | 158 ---- .../KafkaWindowedWordCountExample.java | 143 ---- .../examples/streaming/WindowedWordCount.java | 130 ---- .../apache/beam/runners/flink/io/ConsoleIO.java | 82 --- .../FlinkBatchPipelineTranslator.java | 153 ---- .../FlinkBatchTransformTranslators.java | 594 --------------- .../FlinkBatchTranslationContext.java | 129 ---- .../translation/FlinkPipelineTranslator.java | 36 - .../FlinkStreamingPipelineTranslator.java | 150 ---- .../FlinkStreamingTransformTranslators.java | 406 ----------- .../FlinkStreamingTranslationContext.java | 89 --- .../FlinkCoGroupKeyedListAggregator.java | 60 -- .../functions/FlinkCreateFunction.java | 62 -- .../functions/FlinkDoFnFunction.java | 204 ------ .../FlinkKeyedListAggregationFunction.java | 77 -- .../functions/FlinkMultiOutputDoFnFunction.java | 177 ----- .../FlinkMultiOutputPruningFunction.java | 43 -- .../functions/FlinkPartialReduceFunction.java | 60 -- .../functions/FlinkReduceFunction.java | 57 -- .../flink/translation/functions/UnionCoder.java | 150 ---- .../translation/types/CoderComparator.java | 216 ------ .../translation/types/CoderTypeInformation.java | 116 --- .../translation/types/CoderTypeSerializer.java | 152 ---- .../types/InspectableByteArrayOutputStream.java | 34 - .../translation/types/KvCoderComperator.java | 264 ------- .../types/KvCoderTypeInformation.java | 186 ----- .../types/VoidCoderTypeSerializer.java | 112 --- .../wrappers/CombineFnAggregatorWrapper.java | 92 --- .../wrappers/DataInputViewWrapper.java | 59 -- .../wrappers/DataOutputViewWrapper.java | 52 -- .../SerializableFnAggregatorWrapper.java | 91 --- .../translation/wrappers/SinkOutputFormat.java | 121 ---- .../translation/wrappers/SourceInputFormat.java | 164 ----- .../translation/wrappers/SourceInputSplit.java | 52 -- .../streaming/FlinkAbstractParDoWrapper.java | 266 ------- .../FlinkGroupAlsoByWindowWrapper.java | 631 ---------------- .../streaming/FlinkGroupByKeyWrapper.java | 66 -- .../streaming/FlinkParDoBoundMultiWrapper.java | 77 -- .../streaming/FlinkParDoBoundWrapper.java | 100 --- .../io/FlinkStreamingCreateFunction.java | 65 -- .../streaming/io/UnboundedFlinkSource.java | 82 --- .../streaming/io/UnboundedSocketSource.java | 233 ------ .../streaming/io/UnboundedSourceWrapper.java | 134 ---- .../state/AbstractFlinkTimerInternals.java | 128 ---- .../streaming/state/FlinkStateInternals.java | 715 ------------------- .../streaming/state/StateCheckpointReader.java | 91 --- .../streaming/state/StateCheckpointUtils.java | 155 ---- .../streaming/state/StateCheckpointWriter.java | 129 ---- .../wrappers/streaming/state/StateType.java | 73 -- .../flink/src/main/resources/log4j.properties | 23 - .../apache/beam/runners/flink/AvroITCase.java | 127 ---- .../beam/runners/flink/FlattenizeITCase.java | 74 -- .../beam/runners/flink/FlinkTestPipeline.java | 72 -- .../beam/runners/flink/JoinExamplesITCase.java | 101 --- .../runners/flink/MaybeEmptyTestITCase.java | 65 -- .../runners/flink/ParDoMultiOutputITCase.java | 100 --- .../beam/runners/flink/ReadSourceITCase.java | 165 ----- .../flink/RemoveDuplicatesEmptyITCase.java | 70 -- .../runners/flink/RemoveDuplicatesITCase.java | 71 -- .../beam/runners/flink/SideInputITCase.java | 69 -- .../apache/beam/runners/flink/TfIdfITCase.java | 78 -- .../beam/runners/flink/WordCountITCase.java | 76 -- .../runners/flink/WordCountJoin2ITCase.java | 138 ---- .../runners/flink/WordCountJoin3ITCase.java | 156 ---- .../beam/runners/flink/WriteSinkITCase.java | 158 ---- .../flink/streaming/GroupAlsoByWindowTest.java | 508 ------------- .../flink/streaming/GroupByNullKeyTest.java | 123 ---- .../flink/streaming/StateSerializationTest.java | 305 -------- .../streaming/TopWikipediaSessionsITCase.java | 134 ---- .../beam/runners/flink/util/JoinExamples.java | 160 ----- .../src/test/resources/log4j-test.properties | 27 - runners/pom.xml | 62 +- 161 files changed, 12480 insertions(+), 12340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/README.md ---------------------------------------------------------------------- diff --git a/runners/flink/README.md b/runners/flink/README.md index 8d71680..7418f16 100644 --- a/runners/flink/README.md +++ b/runners/flink/README.md @@ -84,7 +84,7 @@ Flink-Runner is now installed in your local maven repository. ## Executing an example Next, let's run the classic WordCount example. It's semantically identically to -the example provided with ApacheBeam. Only this time, we chose the +the example provided with Apache Beam. Only this time, we chose the `FlinkPipelineRunner` to execute the WordCount on top of Flink. Here's an excerpt from the WordCount class file: @@ -107,13 +107,14 @@ p.run(); To execute the example, let's first get some sample data: - curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt + curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > examples/kinglear.txt Then let's run the included WordCount locally on your machine: + cd examples mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt -Congratulations, you have run your first ApacheBeam program on top of Apache Flink! +Congratulations, you have run your first Apache Beam program on top of Apache Flink! # Running Beam programs on a Flink cluster @@ -129,53 +130,53 @@ The contents of the root `pom.xml` should be slightly changed aftewards (explana ```xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <groupId>com.mycompany.beam</groupId> - <artifactId>beam-test</artifactId> - <version>1.0</version> - - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>flink-runner</artifactId> - <version>0.3-SNAPSHOT</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.4.1</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.beam.runners.flink.examples.WordCount</mainClass> - </transformer> - </transformers> - <artifactSet> - <excludes> - <exclude>org.apache.flink:*</exclude> - </excludes> - </artifactSet> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.mycompany.beam</groupId> + <artifactId>beam-test</artifactId> + <version>1.0</version> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>flink-runner_2.10</artifactId> + <version>0.4-SNAPSHOT</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.beam.runners.flink.examples.WordCount</mainClass> + </transformer> + </transformers> + <artifactSet> + <excludes> + <exclude>org.apache.flink:*</exclude> + </excludes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + + </build> </project> ``` http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml new file mode 100644 index 0000000..91cc1b7 --- /dev/null +++ b/runners/flink/examples/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>flink-runner-parent</artifactId> + <version>0.4-SNAPSHOT</version> + </parent> + + <artifactId>flink-runner-examples_2.10</artifactId> + <version>0.4-SNAPSHOT</version> + + <name>Flink Beam Runner Examples</name> + <packaging>jar</packaging> + + <inceptionYear>2015</inceptionYear> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <properties> + <!-- Default parameters for mvn exec:exec --> + <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz> + <input>kinglear.txt</input> + <output>wordcounts.txt</output> + <parallelism>1</parallelism> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>flink-runner_2.10</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <configuration> + <executable>java</executable> + <arguments> + <argument>-classpath</argument> + <classpath/> + <argument>${clazz}</argument> + <argument>--input=${input}</argument> + <argument>--output=${output}</argument> + <argument>--parallelism=${parallelism}</argument> + </arguments> + </configuration> + </plugin> + + </plugins> + + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java new file mode 100644 index 0000000..ab23b92 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -0,0 +1,452 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.flink.examples; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.GcsOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.Validation; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.Keys; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PDone; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; + +/** + * An example that computes a basic TF-IDF search table for a directory or GCS prefix. + * + * <p> Concepts: joining data; side inputs; logging + * + * <p> To execute this pipeline locally, specify general pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * }</pre> + * and a local output file or output prefix on GCS: + * <pre>{@code + * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] + * }</pre> + * + * <p> To execute this pipeline using the Dataflow service, specify pipeline configuration: + * <pre>{@code + * --project=YOUR_PROJECT_ID + * --stagingLocation=gs://YOUR_STAGING_DIRECTORY + * --runner=BlockingDataflowPipelineRunner + * and an output prefix on GCS: + * --output=gs://YOUR_OUTPUT_PREFIX + * }</pre> + * + * <p> The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with + * {@code --input}. + */ +public class TFIDF { + /** + * Options supported by {@link TFIDF}. + * <p> + * Inherits standard configuration options. + */ + private interface Options extends PipelineOptions, FlinkPipelineOptions { + @Description("Path to the directory or GCS prefix containing files to read from") + @Default.String("gs://dataflow-samples/shakespeare/") + String getInput(); + void setInput(String value); + + @Description("Prefix of output URI to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + /** + * Lists documents contained beneath the {@code options.input} prefix/directory. + */ + public static Set<URI> listInputDocuments(Options options) + throws URISyntaxException, IOException { + URI baseUri = new URI(options.getInput()); + + // List all documents in the directory or GCS prefix. + URI absoluteUri; + if (baseUri.getScheme() != null) { + absoluteUri = baseUri; + } else { + absoluteUri = new URI( + "file", + baseUri.getAuthority(), + baseUri.getPath(), + baseUri.getQuery(), + baseUri.getFragment()); + } + + Set<URI> uris = new HashSet<>(); + if (absoluteUri.getScheme().equals("file")) { + File directory = new File(absoluteUri); + for (String entry : directory.list()) { + File path = new File(directory, entry); + uris.add(path.toURI()); + } + } else if (absoluteUri.getScheme().equals("gs")) { + GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); + URI gcsUriGlob = new URI( + absoluteUri.getScheme(), + absoluteUri.getAuthority(), + absoluteUri.getPath() + "*", + absoluteUri.getQuery(), + absoluteUri.getFragment()); + for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { + uris.add(entry.toUri()); + } + } + + return uris; + } + + /** + * Reads the documents at the provided uris and returns all lines + * from the documents tagged with which document they are from. + */ + public static class ReadDocuments + extends PTransform<PInput, PCollection<KV<URI, String>>> { + private static final long serialVersionUID = 0; + + private Iterable<URI> uris; + + public ReadDocuments(Iterable<URI> uris) { + this.uris = uris; + } + + @Override + public Coder<?> getDefaultOutputCoder() { + return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); + } + + @Override + public PCollection<KV<URI, String>> apply(PInput input) { + Pipeline pipeline = input.getPipeline(); + + // Create one TextIO.Read transform for each document + // and add its output to a PCollectionList + PCollectionList<KV<URI, String>> urisToLines = + PCollectionList.empty(pipeline); + + // TextIO.Read supports: + // - file: URIs and paths locally + // - gs: URIs on the service + for (final URI uri : uris) { + String uriString; + if (uri.getScheme().equals("file")) { + uriString = new File(uri).getPath(); + } else { + uriString = uri.toString(); + } + + PCollection<KV<URI, String>> oneUriToLines = pipeline + .apply(TextIO.Read.from(uriString) + .named("TextIO.Read(" + uriString + ")")) + .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); + + urisToLines = urisToLines.and(oneUriToLines); + } + + return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); + } + } + + /** + * A transform containing a basic TF-IDF pipeline. The input consists of KV objects + * where the key is the document's URI and the value is a piece + * of the document's content. The output is mapping from terms to + * scores for each document URI. + */ + public static class ComputeTfIdf + extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { + private static final long serialVersionUID = 0; + + public ComputeTfIdf() { } + + @Override + public PCollection<KV<String, KV<URI, Double>>> apply( + PCollection<KV<URI, String>> uriToContent) { + + // Compute the total number of documents, and + // prepare this singleton PCollectionView for + // use as a side input. + final PCollectionView<Long> totalDocuments = + uriToContent + .apply("GetURIs", Keys.<URI>create()) + .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create()) + .apply(Count.<URI>globally()) + .apply(View.<Long>asSingleton()); + + // Create a collection of pairs mapping a URI to each + // of the words in the document associated with that that URI. + PCollection<KV<URI, String>> uriToWords = uriToContent + .apply(ParDo.named("SplitWords").of( + new DoFn<KV<URI, String>, KV<URI, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + String line = c.element().getValue(); + for (String word : line.split("\\W+")) { + // Log INFO messages when the word âloveâ is found. + if (word.toLowerCase().equals("love")) { + LOG.info("Found {}", word.toLowerCase()); + } + + if (!word.isEmpty()) { + c.output(KV.of(uri, word.toLowerCase())); + } + } + } + })); + + // Compute a mapping from each word to the total + // number of documents in which it appears. + PCollection<KV<String, Long>> wordToDocCount = uriToWords + .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create()) + .apply(Values.<String>create()) + .apply("CountDocs", Count.<String>perElement()); + + // Compute a mapping from each URI to the total + // number of words in the document associated with that URI. + PCollection<KV<URI, Long>> uriToWordTotal = uriToWords + .apply("GetURIs2", Keys.<URI>create()) + .apply("CountWords", Count.<URI>perElement()); + + // Count, for each (URI, word) pair, the number of + // occurrences of that word in the document associated + // with the URI. + PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords + .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); + + // Adjust the above collection to a mapping from + // (URI, word) pairs to counts into an isomorphic mapping + // from URI to (word, count) pairs, to prepare for a join + // by the URI key. + PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount + .apply(ParDo.named("ShiftKeys").of( + new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey().getKey(); + String word = c.element().getKey().getValue(); + Long occurrences = c.element().getValue(); + c.output(KV.of(uri, KV.of(word, occurrences))); + } + })); + + // Prepare to join the mapping of URI to (word, count) pairs with + // the mapping of URI to total word counts, by associating + // each of the input PCollection<KV<URI, ...>> with + // a tuple tag. Each input must have the same key type, URI + // in this case. The type parameter of the tuple tag matches + // the types of the values for each collection. + final TupleTag<Long> wordTotalsTag = new TupleTag<>(); + final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>(); + KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple + .of(wordTotalsTag, uriToWordTotal) + .and(wordCountsTag, uriToWordAndCount); + + // Perform a CoGroupByKey (a sort of pre-join) on the prepared + // inputs. This yields a mapping from URI to a CoGbkResult + // (CoGroupByKey Result). The CoGbkResult is a mapping + // from the above tuple tags to the values in each input + // associated with a particular URI. In this case, each + // KV<URI, CoGbkResult> group a URI with the total number of + // words in that document as well as all the (word, count) + // pairs for particular words. + PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput + .apply("CoGroupByUri", CoGroupByKey.<URI>create()); + + // Compute a mapping from each word to a (URI, term frequency) + // pair for each URI. A word's term frequency for a document + // is simply the number of times that word occurs in the document + // divided by the total number of words in the document. + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal + .apply(ParDo.named("ComputeTermFrequencies").of( + new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); + + for (KV<String, Long> wordAndCount + : c.element().getValue().getAll(wordCountsTag)) { + String word = wordAndCount.getKey(); + Long wordCount = wordAndCount.getValue(); + Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); + c.output(KV.of(word, KV.of(uri, termFrequency))); + } + } + })); + + // Compute a mapping from each word to its document frequency. + // A word's document frequency in a corpus is the number of + // documents in which the word appears divided by the total + // number of documents in the corpus. Note how the total number of + // documents is passed as a side input; the same value is + // presented to each invocation of the DoFn. + PCollection<KV<String, Double>> wordToDf = wordToDocCount + .apply(ParDo + .named("ComputeDocFrequencies") + .withSideInputs(totalDocuments) + .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Long documentCount = c.element().getValue(); + Long documentTotal = c.sideInput(totalDocuments); + Double documentFrequency = documentCount.doubleValue() + / documentTotal.doubleValue(); + + c.output(KV.of(word, documentFrequency)); + } + })); + + // Join the term frequency and document frequency + // collections, each keyed on the word. + final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>(); + final TupleTag<Double> dfTag = new TupleTag<>(); + PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple + .of(tfTag, wordToUriAndTf) + .and(dfTag, wordToDf) + .apply(CoGroupByKey.<String>create()); + + // Compute a mapping from each word to a (URI, TF-IDF) score + // for each URI. There are a variety of definitions of TF-IDF + // ("term frequency - inverse document frequency") score; + // here we use a basic version that is the term frequency + // divided by the log of the document frequency. + + return wordToUriAndTfAndDf + .apply(ParDo.named("ComputeTfIdf").of( + new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + private static final long serialVersionUID1 = 0; + + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Double df = c.element().getValue().getOnly(dfTag); + + for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { + URI uri = uriAndTf.getKey(); + Double tf = uriAndTf.getValue(); + Double tfIdf = tf * Math.log(1 / df); + c.output(KV.of(word, KV.of(uri, tfIdf))); + } + } + })); + } + + // Instantiate Logger. + // It is suggested that the user specify the class name of the containing class + // (in this case ComputeTfIdf). + private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); + } + + /** + * A {@link PTransform} to write, in CSV format, a mapping from term and URI + * to score. + */ + public static class WriteTfIdf + extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { + private static final long serialVersionUID = 0; + + private String output; + + public WriteTfIdf(String output) { + this.output = output; + } + + @Override + public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { + return wordToUriAndTfIdf + .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + c.output(String.format("%s,\t%s,\t%f", + c.element().getKey(), + c.element().getValue().getKey(), + c.element().getValue().getValue())); + } + })) + .apply(TextIO.Write + .to(output) + .withSuffix(".csv")); + } + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + options.setRunner(FlinkPipelineRunner.class); + + Pipeline pipeline = Pipeline.create(options); + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + pipeline + .apply(new ReadDocuments(listInputDocuments(options))) + .apply(new ComputeTfIdf()) + .apply(new WriteTfIdf(options.getOutput())); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java new file mode 100644 index 0000000..7d12fed --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples; + +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +public class WordCount { + + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class CountWords extends PTransform<PCollection<String>, + PCollection<KV<String, Long>>> { + @Override + public PCollection<KV<String, Long>> apply(PCollection<String> lines) { + + // Convert lines of text into individual words. + PCollection<String> words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + return wordCounts; + } + } + + /** A SimpleFunction that converts a Word and Count into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { + @Override + public String apply(KV<String, Long> input) { + return input.getKey() + ": " + input.getValue(); + } + } + + /** + * Options supported by {@link WordCount}. + * <p> + * Inherits standard configuration options. + */ + public interface Options extends PipelineOptions, FlinkPipelineOptions { + @Description("Path of the file to read from") + @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + String getInput(); + void setInput(String value); + + @Description("Path of the file to write to") + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(Options.class); + options.setRunner(FlinkPipelineRunner.class); + + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + .apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())) + .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java new file mode 100644 index 0000000..8168122 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -0,0 +1,387 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.AvroCoder; +import com.google.cloud.dataflow.sdk.coders.DefaultCoder; +import com.google.cloud.dataflow.sdk.io.*; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; +import org.joda.time.Duration; + +import java.io.IOException; +import java.util.List; + +/** + * To run the example, first open a socket on a terminal by executing the command: + * <li> + * <li> + * <code>nc -lk 9999</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class AutoComplete { + + /** + * A PTransform that takes as input a list of tokens and returns + * the most common tokens per prefix. + */ + public static class ComputeTopCompletions + extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final boolean recursive; + + protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.recursive = recursive; + } + + public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { + return new ComputeTopCompletions(candidatesPerPrefix, recursive); + } + + @Override + public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) { + PCollection<CompletionCandidate> candidates = input + // First count how often each token appears. + .apply(new Count.PerElement<String>()) + + // Map the KV outputs of Count into our own CompletionCandiate class. + .apply(ParDo.named("CreateCompletionCandidates").of( + new DoFn<KV<String, Long>, CompletionCandidate>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); + c.output(cand); + } + })); + + // Compute the top via either a flat or recursive algorithm. + if (recursive) { + return candidates + .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) + .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); + } else { + return candidates + .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); + } + } + } + + /** + * Lower latency, but more expensive. + */ + private static class ComputeTopFlat + extends PTransform<PCollection<CompletionCandidate>, + PCollection<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + @Override + public PCollection<KV<String, List<CompletionCandidate>>> apply( + PCollection<CompletionCandidate> input) { + return input + // For each completion candidate, map it to all prefixes. + .apply(ParDo.of(new AllPrefixes(minPrefix))) + + // Find and return the top candiates for each prefix. + .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix) + .withHotKeyFanout(new HotKeyFanout())); + } + + private static class HotKeyFanout implements SerializableFunction<String, Integer> { + private static final long serialVersionUID = 0; + + @Override + public Integer apply(String input) { + return (int) Math.pow(4, 5 - input.length()); + } + } + } + + /** + * Cheaper but higher latency. + * + * <p> Returns two PCollections, the first is top prefixes of size greater + * than minPrefix, and the second is top prefixes of size exactly + * minPrefix. + */ + private static class ComputeTopRecursive + extends PTransform<PCollection<CompletionCandidate>, + PCollectionList<KV<String, List<CompletionCandidate>>>> { + private static final long serialVersionUID = 0; + + private final int candidatesPerPrefix; + private final int minPrefix; + + public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { + this.candidatesPerPrefix = candidatesPerPrefix; + this.minPrefix = minPrefix; + } + + private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> { + private static final long serialVersionUID = 0; + + @Override + public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) { + return elem.getKey().length() > minPrefix ? 0 : 1; + } + } + + private static class FlattenTops + extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + for (CompletionCandidate cc : c.element().getValue()) { + c.output(cc); + } + } + } + + @Override + public PCollectionList<KV<String, List<CompletionCandidate>>> apply( + PCollection<CompletionCandidate> input) { + if (minPrefix > 10) { + // Base case, partitioning to return the output in the expected format. + return input + .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) + .apply(Partition.of(2, new KeySizePartitionFn())); + } else { + // If a candidate is in the top N for prefix a...b, it must also be in the top + // N for a...bX for every X, which is typlically a much smaller set to consider. + // First, compute the top candidate for prefixes of size at least minPrefix + 1. + PCollectionList<KV<String, List<CompletionCandidate>>> larger = input + .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); + // Consider the top candidates for each prefix of length minPrefix + 1... + PCollection<KV<String, List<CompletionCandidate>>> small = + PCollectionList + .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) + // ...together with those (previously excluded) candidates of length + // exactly minPrefix... + .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() { + private static final long serialVersionUID = 0; + + @Override + public Boolean apply(CompletionCandidate c) { + return c.getValue().length() == minPrefix; + } + }))) + .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) + // ...set the key to be the minPrefix-length prefix... + .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) + // ...and (re)apply the Top operator to all of them together. + .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)); + + PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger + .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); + + return PCollectionList.of(flattenLarger).and(small); + } + } + } + + /** + * A DoFn that keys each candidate by all its prefixes. + */ + private static class AllPrefixes + extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { + private static final long serialVersionUID = 0; + + private final int minPrefix; + private final int maxPrefix; + public AllPrefixes(int minPrefix) { + this(minPrefix, Integer.MAX_VALUE); + } + public AllPrefixes(int minPrefix, int maxPrefix) { + this.minPrefix = minPrefix; + this.maxPrefix = maxPrefix; + } + @Override + public void processElement(ProcessContext c) { + String word = c.element().value; + for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { + KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element()); + c.output(kv); + } + } + } + + /** + * Class used to store tag-count pairs. + */ + @DefaultCoder(AvroCoder.class) + static class CompletionCandidate implements Comparable<CompletionCandidate> { + private long count; + private String value; + + public CompletionCandidate(String value, long count) { + this.value = value; + this.count = count; + } + + public String getValue() { + return value; + } + + // Empty constructor required for Avro decoding. + @SuppressWarnings("unused") + public CompletionCandidate() {} + + @Override + public int compareTo(CompletionCandidate o) { + if (this.count < o.count) { + return -1; + } else if (this.count == o.count) { + return this.value.compareTo(o.value); + } else { + return 1; + } + } + + @Override + public boolean equals(Object other) { + if (other instanceof CompletionCandidate) { + CompletionCandidate that = (CompletionCandidate) other; + return this.count == that.count && this.value.equals(that.value); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Long.valueOf(count).hashCode() ^ value.hashCode(); + } + + @Override + public String toString() { + return "CompletionCandidate[" + value + ", " + count + "]"; + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + /** + * Takes as input a the top candidates per prefix, and emits an entity + * suitable for writing to Datastore. + */ + static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> + implements DoFn.RequiresWindowAccess{ + + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + StringBuilder str = new StringBuilder(); + KV<String, List<CompletionCandidate>> elem = c.element(); + + str.append(elem.getKey() +" @ "+ c.window() +" -> "); + for(CompletionCandidate cand: elem.getValue()) { + str.append(cand.toString() + " "); + } + System.out.println(str.toString()); + c.output(str.toString()); + } + } + + /** + * Options supported by this class. + * + * <p> Inherits standard Dataflow configuration options. + */ + private interface Options extends WindowedWordCount.StreamingWordCountOptions { + @Description("Whether to use the recursive algorithm") + @Default.Boolean(true) + Boolean getRecursive(); + void setRecursive(Boolean value); + } + + public static void main(String[] args) throws IOException { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSource = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + // Create the pipeline. + Pipeline p = Pipeline.create(options); + PCollection<KV<String, List<CompletionCandidate>>> toWrite = p + .apply(readSource) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(ComputeTopCompletions.top(10, options.getRecursive())); + + toWrite + .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile())) + .apply(TextIO.Write.to("./outputAutoComplete.txt")); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java new file mode 100644 index 0000000..3a8bdb0 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -0,0 +1,158 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.joda.time.Duration; + +/** + * To run the example, first open two sockets on two terminals by executing the commands: + * <li> + * <li> + * <code>nc -lk 9999</code>, and + * </li> + * <li> + * <code>nc -lk 9998</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class JoinExamples { + + static PCollection<String> joinEvents(PCollection<String> streamA, + PCollection<String> streamB) throws Exception { + + final TupleTag<String> firstInfoTag = new TupleTag<>(); + final TupleTag<String> secondInfoTag = new TupleTag<>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> firstInfo = streamA.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> secondInfo = streamB.apply( + ParDo.of(new ExtractEventDataFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(firstInfoTag, firstInfo) + .and(secondInfoTag, secondInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.named("Process").of( + new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + String key = e.getKey(); + + String defaultA = "NO_VALUE"; + + // the following getOnly is a bit tricky because it expects to have + // EXACTLY ONE value in the corresponding stream and for the corresponding key. + + String lineA = e.getValue().getOnly(firstInfoTag, defaultA); + for (String lineB : c.element().getValue().getAll(secondInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); + } + } + })); + + return finalResultCollection + .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String result = c.element().getKey() + " -> " + c.element().getValue(); + System.out.println(result); + c.output(result); + } + })); + } + + static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String line = c.element().toLowerCase(); + String key = line.split("\\s")[0]; + c.output(KV.of(key, line)); + } + } + + private interface Options extends WindowedWordCount.StreamingWordCountOptions { + + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSourceA = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); + PTransform<? super PBegin, PCollection<String>> readSourceB = + Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); + + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + Pipeline p = Pipeline.create(options); + + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<String> streamA = p.apply(readSourceA) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + PCollection<String> streamB = p.apply(readSourceB) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<String> formattedResults = joinEvents(streamA, streamB); + formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); + p.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java new file mode 100644 index 0000000..55cdc22 --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.io.UnboundedSource; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.joda.time.Duration; + +import java.util.Properties; + +public class KafkaWindowedWordCountExample { + + static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from + static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact + static final String GROUP_ID = "myGroup"; // Default groupId + static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + System.out.println(row); + c.output(row); + } + } + + public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { + @Description("The Kafka topic to read from") + @Default.String(KAFKA_TOPIC) + String getKafkaTopic(); + + void setKafkaTopic(String value); + + @Description("The Kafka Broker to read from") + @Default.String(KAFKA_BROKER) + String getBroker(); + + void setBroker(String value); + + @Description("The Zookeeper server to connect to") + @Default.String(ZOOKEEPER) + String getZookeeper(); + + void setZookeeper(String value); + + @Description("The groupId") + @Default.String(GROUP_ID) + String getGroup(); + + void setGroup(String value); + + } + + public static void main(String[] args) { + PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); + KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); + options.setJobName("KafkaExample"); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); + Pipeline pipeline = Pipeline.create(options); + + Properties p = new Properties(); + p.setProperty("zookeeper.connect", options.getZookeeper()); + p.setProperty("bootstrap.servers", options.getBroker()); + p.setProperty("group.id", options.getGroup()); + + // this is the Flink consumer that reads the input to + // the program from a kafka topic. + FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( + options.getKafkaTopic(), + new SimpleStringSchema(), p); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputKafka.txt")); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java new file mode 100644 index 0000000..7eb69ba --- /dev/null +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.examples.streaming; + +import org.apache.beam.runners.flink.FlinkPipelineRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.*; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.*; +import com.google.cloud.dataflow.sdk.transforms.windowing.*; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * To run the example, first open a socket on a terminal by executing the command: + * <li> + * <li> + * <code>nc -lk 9999</code> + * </li> + * </li> + * and then launch the example. Now whatever you type in the terminal is going to be + * the input to the program. + * */ +public class WindowedWordCount { + + private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); + + static final long WINDOW_SIZE = 10; // Default window duration in seconds + static final long SLIDE_SIZE = 5; // Default window slide in seconds + + static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + c.output(row); + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options { + @Description("Sliding window duration, in seconds") + @Default.Long(WINDOW_SIZE) + Long getWindowSize(); + + void setWindowSize(Long value); + + @Description("Window slide, in seconds") + @Default.Long(SLIDE_SIZE) + Long getSlide(); + + void setSlide(Long value); + } + + public static void main(String[] args) throws IOException { + StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); + options.setStreaming(true); + options.setWindowSize(10L); + options.setSlide(5L); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + + " sec. and a slide of " + options.getSlide()); + + Pipeline pipeline = Pipeline.create(options); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) + .every(Duration.standardSeconds(options.getSlide()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputWordCount.txt")); + + pipeline.run(); + } +}
