[BEAM-1994] Remove Flink examples package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cdd2544b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cdd2544b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cdd2544b Branch: refs/heads/gearpump-runner Commit: cdd2544ba6dd6ac4aa80c65ecd8e01ab3cf664aa Parents: 8a00f22 Author: Ismaël MejÃa <[email protected]> Authored: Tue Apr 18 17:31:07 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Apr 19 13:37:06 2017 +0200 ---------------------------------------------------------------------- ...PostCommit_Java_ValidatesRunner_Flink.groovy | 2 +- runners/flink/examples/pom.xml | 130 --- .../beam/runners/flink/examples/TFIDF.java | 455 -------- .../beam/runners/flink/examples/WordCount.java | 129 --- .../runners/flink/examples/package-info.java | 22 - .../flink/examples/streaming/AutoComplete.java | 400 ------- .../flink/examples/streaming/JoinExamples.java | 154 --- .../examples/streaming/WindowedWordCount.java | 141 --- .../flink/examples/streaming/package-info.java | 22 - runners/flink/pom.xml | 275 ++++- runners/flink/runner/pom.xml | 330 ------ .../flink/DefaultParallelismFactory.java | 39 - .../flink/FlinkBatchPipelineTranslator.java | 139 --- .../flink/FlinkBatchTransformTranslators.java | 723 ------------ .../flink/FlinkBatchTranslationContext.java | 153 --- .../flink/FlinkDetachedRunnerResult.java | 75 -- .../FlinkPipelineExecutionEnvironment.java | 241 ---- .../runners/flink/FlinkPipelineOptions.java | 101 -- .../runners/flink/FlinkPipelineTranslator.java | 53 - .../apache/beam/runners/flink/FlinkRunner.java | 232 ---- .../runners/flink/FlinkRunnerRegistrar.java | 62 -- .../beam/runners/flink/FlinkRunnerResult.java | 98 -- .../flink/FlinkStreamingPipelineTranslator.java | 276 ----- .../FlinkStreamingTransformTranslators.java | 1044 ----------------- .../flink/FlinkStreamingTranslationContext.java | 130 --- .../flink/FlinkStreamingViewOverrides.java | 372 ------- .../flink/PipelineTranslationOptimizer.java | 72 -- .../beam/runners/flink/TestFlinkRunner.java | 84 -- .../beam/runners/flink/TranslationMode.java | 31 - .../apache/beam/runners/flink/package-info.java | 22 - .../functions/FlinkAggregatorFactory.java | 53 - .../functions/FlinkAssignContext.java | 63 -- .../functions/FlinkAssignWindows.java | 49 - .../functions/FlinkDoFnFunction.java | 161 --- .../FlinkMergingNonShuffleReduceFunction.java | 228 ---- .../FlinkMergingPartialReduceFunction.java | 201 ---- .../functions/FlinkMergingReduceFunction.java | 199 ---- .../FlinkMultiOutputPruningFunction.java | 50 - .../functions/FlinkNoOpStepContext.java | 73 -- .../functions/FlinkPartialReduceFunction.java | 172 --- .../functions/FlinkReduceFunction.java | 173 --- .../functions/FlinkSideInputReader.java | 80 -- .../functions/FlinkStatefulDoFnFunction.java | 198 ---- .../functions/SideInputInitializer.java | 73 -- .../translation/functions/package-info.java | 22 - .../runners/flink/translation/package-info.java | 22 - .../translation/types/CoderTypeInformation.java | 120 -- .../translation/types/CoderTypeSerializer.java | 132 --- .../types/EncodedValueComparator.java | 195 ---- .../types/EncodedValueSerializer.java | 113 -- .../types/EncodedValueTypeInformation.java | 98 -- .../types/InspectableByteArrayOutputStream.java | 34 - .../flink/translation/types/KvKeySelector.java | 50 - .../flink/translation/types/package-info.java | 22 - .../utils/SerializedPipelineOptions.java | 67 -- .../flink/translation/utils/package-info.java | 22 - .../wrappers/DataInputViewWrapper.java | 58 - .../wrappers/DataOutputViewWrapper.java | 51 - .../SerializableFnAggregatorWrapper.java | 98 -- .../translation/wrappers/SourceInputFormat.java | 150 --- .../translation/wrappers/SourceInputSplit.java | 52 - .../translation/wrappers/package-info.java | 22 - .../wrappers/streaming/DoFnOperator.java | 774 ------------- .../streaming/KvToByteBufferKeySelector.java | 56 - .../streaming/SingletonKeyedWorkItem.java | 56 - .../streaming/SingletonKeyedWorkItemCoder.java | 126 --- .../streaming/SplittableDoFnOperator.java | 150 --- .../wrappers/streaming/WindowDoFnOperator.java | 117 -- .../wrappers/streaming/WorkItemKeySelector.java | 56 - .../streaming/io/BoundedSourceWrapper.java | 218 ---- .../streaming/io/UnboundedSocketSource.java | 249 ----- .../streaming/io/UnboundedSourceWrapper.java | 476 -------- .../wrappers/streaming/io/package-info.java | 22 - .../wrappers/streaming/package-info.java | 22 - .../state/FlinkBroadcastStateInternals.java | 865 -------------- .../state/FlinkKeyGroupStateInternals.java | 487 -------- .../state/FlinkSplitStateInternals.java | 260 ----- .../streaming/state/FlinkStateInternals.java | 1053 ------------------ .../state/KeyGroupCheckpointedOperator.java | 35 - .../state/KeyGroupRestoringOperator.java | 32 - .../wrappers/streaming/state/package-info.java | 22 - .../runner/src/main/resources/log4j.properties | 23 - .../flink/EncodedValueComparatorTest.java | 70 -- .../runners/flink/FlinkRunnerRegistrarTest.java | 48 - .../beam/runners/flink/FlinkTestPipeline.java | 72 -- .../beam/runners/flink/PipelineOptionsTest.java | 184 --- .../beam/runners/flink/ReadSourceITCase.java | 85 -- .../flink/ReadSourceStreamingITCase.java | 74 -- .../beam/runners/flink/WriteSinkITCase.java | 192 ---- .../flink/streaming/DoFnOperatorTest.java | 600 ---------- .../FlinkBroadcastStateInternalsTest.java | 245 ---- .../FlinkKeyGroupStateInternalsTest.java | 262 ----- .../streaming/FlinkSplitStateInternalsTest.java | 101 -- .../streaming/FlinkStateInternalsTest.java | 395 ------- .../flink/streaming/GroupByNullKeyTest.java | 124 --- .../flink/streaming/TestCountingSource.java | 254 ----- .../streaming/TopWikipediaSessionsITCase.java | 133 --- .../streaming/UnboundedSourceWrapperTest.java | 464 -------- .../runners/flink/streaming/package-info.java | 22 - .../src/test/resources/log4j-test.properties | 27 - .../flink/DefaultParallelismFactory.java | 39 + .../flink/FlinkBatchPipelineTranslator.java | 139 +++ .../flink/FlinkBatchTransformTranslators.java | 723 ++++++++++++ .../flink/FlinkBatchTranslationContext.java | 153 +++ .../flink/FlinkDetachedRunnerResult.java | 75 ++ .../FlinkPipelineExecutionEnvironment.java | 241 ++++ .../runners/flink/FlinkPipelineOptions.java | 101 ++ .../runners/flink/FlinkPipelineTranslator.java | 53 + .../apache/beam/runners/flink/FlinkRunner.java | 232 ++++ .../runners/flink/FlinkRunnerRegistrar.java | 62 ++ .../beam/runners/flink/FlinkRunnerResult.java | 98 ++ .../flink/FlinkStreamingPipelineTranslator.java | 276 +++++ .../FlinkStreamingTransformTranslators.java | 1044 +++++++++++++++++ .../flink/FlinkStreamingTranslationContext.java | 130 +++ .../flink/FlinkStreamingViewOverrides.java | 372 +++++++ .../flink/PipelineTranslationOptimizer.java | 72 ++ .../beam/runners/flink/TestFlinkRunner.java | 84 ++ .../beam/runners/flink/TranslationMode.java | 31 + .../apache/beam/runners/flink/package-info.java | 22 + .../functions/FlinkAggregatorFactory.java | 53 + .../functions/FlinkAssignContext.java | 63 ++ .../functions/FlinkAssignWindows.java | 49 + .../functions/FlinkDoFnFunction.java | 161 +++ .../FlinkMergingNonShuffleReduceFunction.java | 228 ++++ .../FlinkMergingPartialReduceFunction.java | 201 ++++ .../functions/FlinkMergingReduceFunction.java | 199 ++++ .../FlinkMultiOutputPruningFunction.java | 50 + .../functions/FlinkNoOpStepContext.java | 73 ++ .../functions/FlinkPartialReduceFunction.java | 172 +++ .../functions/FlinkReduceFunction.java | 173 +++ .../functions/FlinkSideInputReader.java | 80 ++ .../functions/FlinkStatefulDoFnFunction.java | 198 ++++ .../functions/SideInputInitializer.java | 73 ++ .../translation/functions/package-info.java | 22 + .../runners/flink/translation/package-info.java | 22 + .../translation/types/CoderTypeInformation.java | 120 ++ .../translation/types/CoderTypeSerializer.java | 132 +++ .../types/EncodedValueComparator.java | 195 ++++ .../types/EncodedValueSerializer.java | 113 ++ .../types/EncodedValueTypeInformation.java | 98 ++ .../types/InspectableByteArrayOutputStream.java | 34 + .../flink/translation/types/KvKeySelector.java | 50 + .../flink/translation/types/package-info.java | 22 + .../utils/SerializedPipelineOptions.java | 67 ++ .../flink/translation/utils/package-info.java | 22 + .../wrappers/DataInputViewWrapper.java | 58 + .../wrappers/DataOutputViewWrapper.java | 51 + .../SerializableFnAggregatorWrapper.java | 98 ++ .../translation/wrappers/SourceInputFormat.java | 150 +++ .../translation/wrappers/SourceInputSplit.java | 52 + .../translation/wrappers/package-info.java | 22 + .../wrappers/streaming/DoFnOperator.java | 774 +++++++++++++ .../streaming/KvToByteBufferKeySelector.java | 56 + .../streaming/SingletonKeyedWorkItem.java | 56 + .../streaming/SingletonKeyedWorkItemCoder.java | 126 +++ .../streaming/SplittableDoFnOperator.java | 150 +++ .../wrappers/streaming/WindowDoFnOperator.java | 117 ++ .../wrappers/streaming/WorkItemKeySelector.java | 56 + .../streaming/io/BoundedSourceWrapper.java | 218 ++++ .../streaming/io/UnboundedSocketSource.java | 249 +++++ .../streaming/io/UnboundedSourceWrapper.java | 476 ++++++++ .../wrappers/streaming/io/package-info.java | 22 + .../wrappers/streaming/package-info.java | 22 + .../state/FlinkBroadcastStateInternals.java | 865 ++++++++++++++ .../state/FlinkKeyGroupStateInternals.java | 487 ++++++++ .../state/FlinkSplitStateInternals.java | 260 +++++ .../streaming/state/FlinkStateInternals.java | 1053 ++++++++++++++++++ .../state/KeyGroupCheckpointedOperator.java | 35 + .../state/KeyGroupRestoringOperator.java | 32 + .../wrappers/streaming/state/package-info.java | 22 + .../flink/src/main/resources/log4j.properties | 23 + .../flink/EncodedValueComparatorTest.java | 70 ++ .../runners/flink/FlinkRunnerRegistrarTest.java | 48 + .../beam/runners/flink/FlinkTestPipeline.java | 72 ++ .../beam/runners/flink/PipelineOptionsTest.java | 184 +++ .../beam/runners/flink/ReadSourceITCase.java | 85 ++ .../flink/ReadSourceStreamingITCase.java | 74 ++ .../beam/runners/flink/WriteSinkITCase.java | 192 ++++ .../flink/streaming/DoFnOperatorTest.java | 600 ++++++++++ .../FlinkBroadcastStateInternalsTest.java | 245 ++++ .../FlinkKeyGroupStateInternalsTest.java | 262 +++++ .../streaming/FlinkSplitStateInternalsTest.java | 101 ++ .../streaming/FlinkStateInternalsTest.java | 395 +++++++ .../flink/streaming/GroupByNullKeyTest.java | 124 +++ .../flink/streaming/TestCountingSource.java | 254 +++++ .../streaming/TopWikipediaSessionsITCase.java | 133 +++ .../streaming/UnboundedSourceWrapperTest.java | 464 ++++++++ .../runners/flink/streaming/package-info.java | 22 + .../src/test/resources/log4j-test.properties | 27 + 189 files changed, 15765 insertions(+), 17293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy ---------------------------------------------------------------------- diff --git a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy index 411106d..5b228bc 100644 --- a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy +++ b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Flink.groovy @@ -39,5 +39,5 @@ mavenJob('beam_PostCommit_Java_ValidatesRunner_Flink') { 'Run Flink ValidatesRunner') // Maven goals for this job. - goals('-B -e clean verify -am -pl runners/flink/runner -Plocal-validates-runner-tests -Pvalidates-runner-tests') + goals('-B -e clean verify -am -pl runners/flink -Plocal-validates-runner-tests -Pvalidates-runner-tests') } http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml deleted file mode 100644 index aaf76d9..0000000 --- a/runners/flink/examples/pom.xml +++ /dev/null @@ -1,130 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-flink-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-runners-flink_2.10-examples</artifactId> - - <name>Apache Beam :: Runners :: Flink :: Examples</name> - - <packaging>jar</packaging> - - <properties> - <!-- Default parameters for mvn exec:java --> - <flink.examples.input>kinglear.txt</flink.examples.input> - <flink.examples.output>wordcounts.txt</flink.examples.output> - <flink.examples.parallelism>-1</flink.examples.parallelism> - </properties> - - <profiles> - <profile> - <id>disable-validates-runner-tests</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <executions> - <execution> - <id>validates-runner-tests</id> - <configuration> - <skip>true</skip> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> - - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-extensions-gcp-core</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-flink_2.10</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-0.8_2.10</artifactId> - <version>${flink.version}</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <goals><goal>analyze-only</goal></goals> - <configuration> - <!-- disable for now until dependencies are cleaned up --> - <failOnWarning>false</failOnWarning> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <configuration> - <executable>java</executable> - <arguments> - <argument>--runner=org.apache.beam.runners.flink.FlinkRunner</argument> - <argument>--parallelism=${flink.examples.parallelism}</argument> - <argument>--input=${flink.examples.input}</argument> - <argument>--output=${flink.examples.output}</argument> - </arguments> - </configuration> - </plugin> - - </plugins> - - </build> - -</project> http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java deleted file mode 100644 index 8e1df08..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ /dev/null @@ -1,455 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringDelegateCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Distinct; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An example that computes a basic TF-IDF search table for a directory or GCS prefix. - * - * <p>Concepts: joining data; side inputs; logging - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * }</pre> - * and a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowRunner - * and an output prefix on GCS: - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> - * - * <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with - * {@code --input}. - */ -public class TFIDF { - /** - * Options supported by {@link TFIDF}. - * - * <p>Inherits standard configuration options. - */ - private interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/") - String getInput(); - void setInput(String value); - - @Description("Prefix of output URI to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - /** - * Lists documents contained beneath the {@code options.input} prefix/directory. - */ - public static Set<URI> listInputDocuments(Options options) - throws URISyntaxException, IOException { - URI baseUri = new URI(options.getInput()); - - // List all documents in the directory or GCS prefix. - URI absoluteUri; - if (baseUri.getScheme() != null) { - absoluteUri = baseUri; - } else { - absoluteUri = new URI( - "file", - baseUri.getAuthority(), - baseUri.getPath(), - baseUri.getQuery(), - baseUri.getFragment()); - } - - Set<URI> uris = new HashSet<>(); - if (absoluteUri.getScheme().equals("file")) { - File directory = new File(absoluteUri); - String[] directoryListing = directory.list(); - if (directoryListing == null) { - throw new IOException( - "Directory " + absoluteUri + " is not a valid path or IO Error occurred."); - } - for (String entry : directoryListing) { - File path = new File(directory, entry); - uris.add(path.toURI()); - } - } else if (absoluteUri.getScheme().equals("gs")) { - GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); - URI gcsUriGlob = new URI( - absoluteUri.getScheme(), - absoluteUri.getAuthority(), - absoluteUri.getPath() + "*", - absoluteUri.getQuery(), - absoluteUri.getFragment()); - for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { - uris.add(entry.toUri()); - } - } - - return uris; - } - - /** - * Reads the documents at the provided uris and returns all lines - * from the documents tagged with which document they are from. - */ - public static class ReadDocuments - extends PTransform<PBegin, PCollection<KV<URI, String>>> { - private static final long serialVersionUID = 0; - - // transient because PTransform is not really meant to be serialized. - // see note on PTransform - private final transient Iterable<URI> uris; - - public ReadDocuments(Iterable<URI> uris) { - this.uris = uris; - } - - @Override - public Coder<?> getDefaultOutputCoder() { - return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); - } - - @Override - public PCollection<KV<URI, String>> expand(PBegin input) { - Pipeline pipeline = input.getPipeline(); - - // Create one TextIO.Read transform for each document - // and add its output to a PCollectionList - PCollectionList<KV<URI, String>> urisToLines = - PCollectionList.empty(pipeline); - - // TextIO.Read supports: - // - file: URIs and paths locally - // - gs: URIs on the service - for (final URI uri : uris) { - String uriString; - if (uri.getScheme().equals("file")) { - uriString = new File(uri).getPath(); - } else { - uriString = uri.toString(); - } - - PCollection<KV<URI, String>> oneUriToLines = pipeline - .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString)) - .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); - - urisToLines = urisToLines.and(oneUriToLines); - } - - return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); - } - } - - /** - * A transform containing a basic TF-IDF pipeline. The input consists of KV objects - * where the key is the document's URI and the value is a piece - * of the document's content. The output is mapping from terms to - * scores for each document URI. - */ - public static class ComputeTfIdf - extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { - private static final long serialVersionUID = 0; - - public ComputeTfIdf() { } - - @Override - public PCollection<KV<String, KV<URI, Double>>> expand( - PCollection<KV<URI, String>> uriToContent) { - - // Compute the total number of documents, and - // prepare this singleton PCollectionView for - // use as a side input. - final PCollectionView<Long> totalDocuments = - uriToContent - .apply("GetURIs", Keys.<URI>create()) - .apply("DistinctDocs", Distinct.<URI>create()) - .apply(Count.<URI>globally()) - .apply(View.<Long>asSingleton()); - - // Create a collection of pairs mapping a URI to each - // of the words in the document associated with that that URI. - PCollection<KV<URI, String>> uriToWords = uriToContent - .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); - for (String word : line.split("\\W+")) { - // Log INFO messages when the word âloveâ is found. - if (word.toLowerCase().equals("love")) { - LOG.info("Found {}", word.toLowerCase()); - } - - if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); - } - } - } - })); - - // Compute a mapping from each word to the total - // number of documents in which it appears. - PCollection<KV<String, Long>> wordToDocCount = uriToWords - .apply("DistinctWords", Distinct.<KV<URI, String>>create()) - .apply(Values.<String>create()) - .apply("CountDocs", Count.<String>perElement()); - - // Compute a mapping from each URI to the total - // number of words in the document associated with that URI. - PCollection<KV<URI, Long>> uriToWordTotal = uriToWords - .apply("GetURIs2", Keys.<URI>create()) - .apply("CountWords", Count.<URI>perElement()); - - // Count, for each (URI, word) pair, the number of - // occurrences of that word in the document associated - // with the URI. - PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords - .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); - - // Adjust the above collection to a mapping from - // (URI, word) pairs to counts into an isomorphic mapping - // from URI to (word, count) pairs, to prepare for a join - // by the URI key. - PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount - .apply("ShiftKeys", ParDo.of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - URI uri = c.element().getKey().getKey(); - String word = c.element().getKey().getValue(); - Long occurrences = c.element().getValue(); - c.output(KV.of(uri, KV.of(word, occurrences))); - } - })); - - // Prepare to join the mapping of URI to (word, count) pairs with - // the mapping of URI to total word counts, by associating - // each of the input PCollection<KV<URI, ...>> with - // a tuple tag. Each input must have the same key type, URI - // in this case. The type parameter of the tuple tag matches - // the types of the values for each collection. - final TupleTag<Long> wordTotalsTag = new TupleTag<>(); - final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>(); - KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple - .of(wordTotalsTag, uriToWordTotal) - .and(wordCountsTag, uriToWordAndCount); - - // Perform a CoGroupByKey (a sort of pre-join) on the prepared - // inputs. This yields a mapping from URI to a CoGbkResult - // (CoGroupByKey Result). The CoGbkResult is a mapping - // from the above tuple tags to the values in each input - // associated with a particular URI. In this case, each - // KV<URI, CoGbkResult> group a URI with the total number of - // words in that document as well as all the (word, count) - // pairs for particular words. - PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput - .apply("CoGroupByUri", CoGroupByKey.<URI>create()); - - // Compute a mapping from each word to a (URI, term frequency) - // pair for each URI. A word's term frequency for a document - // is simply the number of times that word occurs in the document - // divided by the total number of words in the document. - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply("ComputeTermFrequencies", ParDo.of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); - - for (KV<String, Long> wordAndCount - : c.element().getValue().getAll(wordCountsTag)) { - String word = wordAndCount.getKey(); - Long wordCount = wordAndCount.getValue(); - Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); - c.output(KV.of(word, KV.of(uri, termFrequency))); - } - } - })); - - // Compute a mapping from each word to its document frequency. - // A word's document frequency in a corpus is the number of - // documents in which the word appears divided by the total - // number of documents in the corpus. Note how the total number of - // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. - PCollection<KV<String, Double>> wordToDf = wordToDocCount - .apply("ComputeDocFrequencies", ParDo - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Long documentCount = c.element().getValue(); - Long documentTotal = c.sideInput(totalDocuments); - Double documentFrequency = documentCount.doubleValue() - / documentTotal.doubleValue(); - - c.output(KV.of(word, documentFrequency)); - } - }).withSideInputs(totalDocuments)); - - // Join the term frequency and document frequency - // collections, each keyed on the word. - final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>(); - final TupleTag<Double> dfTag = new TupleTag<>(); - PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple - .of(tfTag, wordToUriAndTf) - .and(dfTag, wordToDf) - .apply(CoGroupByKey.<String>create()); - - // Compute a mapping from each word to a (URI, TF-IDF) score - // for each URI. There are a variety of definitions of TF-IDF - // ("term frequency - inverse document frequency") score; - // here we use a basic version that is the term frequency - // divided by the log of the document frequency. - - return wordToUriAndTfAndDf - .apply("ComputeTfIdf", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Double df = c.element().getValue().getOnly(dfTag); - - for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { - URI uri = uriAndTf.getKey(); - Double tf = uriAndTf.getValue(); - Double tfIdf = tf * Math.log(1 / df); - c.output(KV.of(word, KV.of(uri, tfIdf))); - } - } - })); - } - - // Instantiate Logger. - // It is suggested that the user specify the class name of the containing class - // (in this case ComputeTfIdf). - private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); - } - - /** - * A {@link PTransform} to write, in CSV format, a mapping from term and URI - * to score. - */ - public static class WriteTfIdf - extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { - private static final long serialVersionUID = 0; - - private String output; - - public WriteTfIdf(String output) { - this.output = output; - } - - @Override - public PDone expand(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { - return wordToUriAndTfIdf - .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - c.output(String.format("%s,\t%s,\t%f", - c.element().getKey(), - c.element().getValue().getKey(), - c.element().getValue().getValue())); - } - })) - .apply(TextIO.Write - .to(output) - .withSuffix(".csv")); - } - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - options.setRunner(FlinkRunner.class); - - Pipeline pipeline = Pipeline.create(options); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - pipeline - .apply(new ReadDocuments(listInputDocuments(options))) - .apply(new ComputeTfIdf()) - .apply(new WriteTfIdf(options.getOutput())); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java deleted file mode 100644 index 6ae4cf8..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * Wordcount pipeline. - */ -public class WordCount { - - /** - * Function to extract words. - */ - public static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** - * PTransform counting words. - */ - public static class CountWords extends PTransform<PCollection<String>, - PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> expand(PCollection<String> lines) { - - // Convert lines of text into individual words. - PCollection<String> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - return wordCounts; - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { - @Override - public String apply(KV<String, Long> input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * Options supported by {@link WordCount}. - * - * <p>Inherits standard configuration options. - */ - public interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path of the file to read from") - String getInput(); - void setInput(String value); - - @Description("Path of the file to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) { - - Options options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(Options.class); - options.setRunner(FlinkRunner.class); - - Pipeline p = Pipeline.create(options); - - p.apply("ReadLines", TextIO.Read.from(options.getInput())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply("WriteCounts", TextIO.Write.to(options.getOutput())); - - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java deleted file mode 100644 index b0ecb56..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Flink Beam runner exemple. - */ -package org.apache.beam.runners.flink.examples; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java deleted file mode 100644 index d07df29..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import java.io.IOException; -import java.util.List; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.transforms.Partition.PartitionFn; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.Top; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.joda.time.Duration; - -/** - * To run the example, first open a socket on a terminal by executing the command: - * <ul> - * <li><code>nc -lk 9999</code> - * </ul> - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class AutoComplete { - - /** - * A PTransform that takes as input a list of tokens and returns - * the most common tokens per prefix. - */ - public static class ComputeTopCompletions - extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final boolean recursive; - - protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.recursive = recursive; - } - - public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { - return new ComputeTopCompletions(candidatesPerPrefix, recursive); - } - - @Override - public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String> input) { - PCollection<CompletionCandidate> candidates = input - // First count how often each token appears. - .apply(Count.<String>perElement()) - - // Map the KV outputs of Count into our own CompletionCandiate class. - .apply("CreateCompletionCandidates", ParDo.of( - new DoFn<KV<String, Long>, CompletionCandidate>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), - c.element().getValue()); - c.output(cand); - } - })); - - // Compute the top via either a flat or recursive algorithm. - if (recursive) { - return candidates - .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) - .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); - } else { - return candidates - .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); - } - } - } - - /** - * Lower latency, but more expensive. - */ - private static class ComputeTopFlat - extends PTransform<PCollection<CompletionCandidate>, - PCollection<KV<String, List<CompletionCandidate>>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - @Override - public PCollection<KV<String, List<CompletionCandidate>>> expand( - PCollection<CompletionCandidate> input) { - return input - // For each completion candidate, map it to all prefixes. - .apply(ParDo.of(new AllPrefixes(minPrefix))) - - // Find and return the top candiates for each prefix. - .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix) - .withHotKeyFanout(new HotKeyFanout())); - } - - private static class HotKeyFanout implements SerializableFunction<String, Integer> { - private static final long serialVersionUID = 0; - - @Override - public Integer apply(String input) { - return (int) Math.pow(4, 5 - input.length()); - } - } - } - - /** - * Cheaper but higher latency. - * - * <p>Returns two PCollections, the first is top prefixes of size greater - * than minPrefix, and the second is top prefixes of size exactly - * minPrefix. - */ - private static class ComputeTopRecursive - extends PTransform<PCollection<CompletionCandidate>, - PCollectionList<KV<String, List<CompletionCandidate>>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> { - private static final long serialVersionUID = 0; - - @Override - public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) { - return elem.getKey().length() > minPrefix ? 0 : 1; - } - } - - private static class FlattenTops - extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - for (CompletionCandidate cc : c.element().getValue()) { - c.output(cc); - } - } - } - - @Override - public PCollectionList<KV<String, List<CompletionCandidate>>> expand( - PCollection<CompletionCandidate> input) { - if (minPrefix > 10) { - // Base case, partitioning to return the output in the expected format. - return input - .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) - .apply(Partition.of(2, new KeySizePartitionFn())); - } else { - // If a candidate is in the top N for prefix a...b, it must also be in the top - // N for a...bX for every X, which is typlically a much smaller set to consider. - // First, compute the top candidate for prefixes of size at least minPrefix + 1. - PCollectionList<KV<String, List<CompletionCandidate>>> larger = input - .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); - // Consider the top candidates for each prefix of length minPrefix + 1... - PCollection<KV<String, List<CompletionCandidate>>> small = - PCollectionList - .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) - // ...together with those (previously excluded) candidates of length - // exactly minPrefix... - .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() { - private static final long serialVersionUID = 0; - - @Override - public Boolean apply(CompletionCandidate c) { - return c.getValue().length() == minPrefix; - } - }))) - .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) - // ...set the key to be the minPrefix-length prefix... - .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) - // ...and (re)apply the Top operator to all of them together. - .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)); - - PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger - .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); - - return PCollectionList.of(flattenLarger).and(small); - } - } - } - - /** - * A DoFn that keys each candidate by all its prefixes. - */ - private static class AllPrefixes - extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { - private static final long serialVersionUID = 0; - - private final int minPrefix; - private final int maxPrefix; - public AllPrefixes(int minPrefix) { - this(minPrefix, Integer.MAX_VALUE); - } - public AllPrefixes(int minPrefix, int maxPrefix) { - this.minPrefix = minPrefix; - this.maxPrefix = maxPrefix; - } - @ProcessElement - public void processElement(ProcessContext c) { - String word = c.element().value; - for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element()); - c.output(kv); - } - } - } - - /** - * Class used to store tag-count pairs. - */ - @DefaultCoder(AvroCoder.class) - static class CompletionCandidate implements Comparable<CompletionCandidate> { - private long count; - private String value; - - public CompletionCandidate(String value, long count) { - this.value = value; - this.count = count; - } - - public String getValue() { - return value; - } - - // Empty constructor required for Avro decoding. - @SuppressWarnings("unused") - public CompletionCandidate() {} - - @Override - public int compareTo(CompletionCandidate o) { - if (this.count < o.count) { - return -1; - } else if (this.count == o.count) { - return this.value.compareTo(o.value); - } else { - return 1; - } - } - - @Override - public boolean equals(Object other) { - if (other instanceof CompletionCandidate) { - CompletionCandidate that = (CompletionCandidate) other; - return this.count == that.count && this.value.equals(that.value); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Long.valueOf(count).hashCode() ^ value.hashCode(); - } - - @Override - public String toString() { - return "CompletionCandidate[" + value + ", " + count + "]"; - } - } - - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** - * Takes as input a the top candidates per prefix, and emits an entity suitable for writing to - * Datastore. - */ - static class FormatForPerTaskLocalFile - extends DoFn<KV<String, List<CompletionCandidate>>, String> { - - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - StringBuilder str = new StringBuilder(); - KV<String, List<CompletionCandidate>> elem = c.element(); - - str.append(elem.getKey() + " @ " + window + " -> "); - for (CompletionCandidate cand: elem.getValue()) { - str.append(cand.toString() + " "); - } - System.out.println(str.toString()); - c.output(str.toString()); - } - } - - /** - * Options supported by this class. - * - * <p>Inherits standard Dataflow configuration options. - */ - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - @Description("Whether to use the recursive algorithm") - @Default.Boolean(true) - Boolean getRecursive(); - void setRecursive(Boolean value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkRunner.class); - - - WindowFn<Object, ?> windowFn = - FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); - - // Create the pipeline. - Pipeline p = Pipeline.create(options); - PCollection<KV<String, List<CompletionCandidate>>> toWrite = p - .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(ComputeTopCompletions.top(10, options.getRecursive())); - - toWrite - .apply("FormatForPerTaskFile", ParDo.of(new FormatForPerTaskLocalFile())) - .apply(TextIO.Write.to("./outputAutoComplete.txt")); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java deleted file mode 100644 index 8fefc9f..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; - -/** - * To run the example, first open two sockets on two terminals by executing the commands: - * <ul> - * <li><code>nc -lk 9999</code>, and - * <li><code>nc -lk 9998</code> - * </ul> - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class JoinExamples { - - static PCollection<String> joinEvents(PCollection<String> streamA, - PCollection<String> streamB) throws Exception { - - final TupleTag<String> firstInfoTag = new TupleTag<>(); - final TupleTag<String> secondInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection<KV<String, String>> firstInfo = streamA.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection<KV<String, String>> secondInfo = streamB.apply( - ParDo.of(new ExtractEventDataFn())); - - // country code 'key' -> CGBKR (<event info>, <country name>) - PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple - .of(firstInfoTag, firstInfo) - .and(secondInfoTag, secondInfo) - .apply(CoGroupByKey.<String>create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of <event info>, <country name> - PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply("Process", ParDo.of( - new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - KV<String, CoGbkResult> e = c.element(); - String key = e.getKey(); - - String defaultA = "NO_VALUE"; - - // the following getOnly is a bit tricky because it expects to have - // EXACTLY ONE value in the corresponding stream and for the corresponding key. - - String lineA = e.getValue().getOnly(firstInfoTag, defaultA); - for (String lineB : c.element().getValue().getAll(secondInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); - } - } - })); - - return finalResultCollection - .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - String result = c.element().getKey() + " -> " + c.element().getValue(); - System.out.println(result); - c.output(result); - } - })); - } - - static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { - private static final long serialVersionUID = 0; - - @ProcessElement - public void processElement(ProcessContext c) { - String line = c.element().toLowerCase(); - String key = line.split("\\s")[0]; - c.output(KV.of(key, line)); - } - } - - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkRunner.class); - - WindowFn<Object, ?> windowFn = FixedWindows.of( - Duration.standardSeconds(options.getWindowSize())); - - Pipeline p = Pipeline.create(options); - - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection<String> streamA = p - .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - PCollection<String> streamB = p - .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3))) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<String> formattedResults = joinEvents(streamA, streamB); - formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java deleted file mode 100644 index 792c214..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import java.io.IOException; -import org.apache.beam.runners.flink.FlinkRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * To run the example, first open a socket on a terminal by executing the command: - * <ul> - * <li><code>nc -lk 9999</code> - * </ul> - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class WindowedWordCount { - - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - - static final long WINDOW_SIZE = 10; // Default window duration in seconds - static final long SLIDE_SIZE = 5; // Default window slide in seconds - - static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - @ProcessElement - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " - + c.timestamp().toString(); - c.output(row); - } - } - - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", Sum.ofLongs()); - - @ProcessElement - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** - * Pipeline options. - */ - public interface StreamingWordCountOptions - extends org.apache.beam.runners.flink.examples.WordCount.Options { - @Description("Sliding window duration, in seconds") - @Default.Long(WINDOW_SIZE) - Long getWindowSize(); - - void setWindowSize(Long value); - - @Description("Window slide, in seconds") - @Default.Long(SLIDE_SIZE) - Long getSlide(); - - void setSlide(Long value); - } - - public static void main(String[] args) throws IOException { - StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(StreamingWordCountOptions.class); - options.setStreaming(true); - options.setWindowSize(10L); - options.setSlide(5L); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkRunner.class); - - LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() - + " sec. and a slide of " + options.getSlide()); - - Pipeline pipeline = Pipeline.create(options); - - PCollection<String> words = pipeline - .apply("StreamingWordCount", - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(SlidingWindows.of( - Duration.standardSeconds(options.getWindowSize())) - .every(Duration.standardSeconds(options.getSlide()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputWordCount.txt")); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java deleted file mode 100644 index 58f41b6..0000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Flink Beam runner exemple. - */ -package org.apache.beam.runners.flink.examples.streaming; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index a5c5ea0..351035e 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -26,22 +26,97 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>beam-runners-flink-parent</artifactId> + <artifactId>beam-runners-flink</artifactId> <name>Apache Beam :: Runners :: Flink</name> - - <packaging>pom</packaging> - - <modules> - <module>runner</module> - <module>examples</module> - </modules> + <packaging>jar</packaging> <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <flink.version>1.2.0</flink.version> </properties> + <profiles> + <profile> + <id>local-validates-runner-tests</id> + <activation><activeByDefault>false</activeByDefault></activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + + <!-- This configures the inherited validates-runner-tests + execution to execute with a local Flink instance. --> + <execution> + <id>validates-runner-tests</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups> + <excludedGroups> + org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.UsesTestStream + </excludedGroups> + <parallel>none</parallel> + <failIfNoTests>true</failIfNoTests> + <dependenciesToScan> + <dependency>org.apache.beam:beam-sdks-java-core</dependency> + </dependenciesToScan> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=TestFlinkRunner", + "--streaming=false" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </execution> + + <!-- This second execution runs the tests in streaming mode --> + <execution> + <id>streaming-validates-runner-tests</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups> + <excludedGroups> + org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, + org.apache.beam.sdk.testing.UsesSetState, + org.apache.beam.sdk.testing.UsesMapState, + org.apache.beam.sdk.testing.UsesAttemptedMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.UsesTestStream + </excludedGroups> + <parallel>none</parallel> + <failIfNoTests>true</failIfNoTests> + <dependenciesToScan> + <dependency>org.apache.beam:beam-sdks-java-core</dependency> + </dependenciesToScan> + <systemPropertyVariables> + <beamTestPipelineOptions> + [ + "--runner=TestFlinkRunner", + "--streaming=true" + ] + </beamTestPipelineOptions> + </systemPropertyVariables> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + <build> <pluginManagement> <plugins> @@ -89,19 +164,103 @@ <!-- Flink dependencies --> <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.10</artifactId> <version>${flink.version}</version> </dependency> + <!-- For testing --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime_2.10</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> </dependency> + <!-- Beam --> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-construction-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> </dependency> <dependency> @@ -113,5 +272,101 @@ <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> + + <!-- + Force an upgrade on the version of Apache Commons from Flink to support DEFLATE compression. + --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + <scope>runtime</scope> + </dependency> + + <!-- Test scoped --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <!-- Depend on test jar to scan for ValidatesRunner tests --> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-bigquery</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>apacheds-jdbm1</artifactId> + <groupId>org.apache.directory.jdbm</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- Optional Pipeline Registration --> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <optional>true</optional> + </dependency> + + <!-- transitive test dependencies from beam-sdk-java-core --> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-common-fn-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> </project>
