This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 82b6afa [BEAM-6493] Convert the WordCount samples to Kotlin (#8291)
82b6afa is described below
commit 82b6afaf363355c7b96d710f9e071443eb4b2bf6
Author: Harshit Dwivedi <[email protected]>
AuthorDate: Sat Apr 13 00:16:04 2019 +0530
[BEAM-6493] Convert the WordCount samples to Kotlin (#8291)
[BEAM-6493] Convert the WordCount samples to Kotlin
---
.test-infra/jenkins/job_PreCommit_Java.groovy | 1 +
.../job_PreCommit_Java_Examples_Dataflow.groovy | 1 +
.../job_PreCommit_Java_PortabilityApi.groovy | 1 +
.test-infra/jenkins/job_PreCommit_Spotless.groovy | 1 +
examples/{ => kotlin}/OWNERS | 1 +
examples/kotlin/README.md | 57 +++
examples/kotlin/build.gradle | 146 ++++++++
.../beam/examples/kotlin/DebuggingWordCount.kt | 162 +++++++++
.../beam/examples/kotlin/MinimalWordCount.kt | 122 +++++++
.../beam/examples/kotlin/WindowedWordCount.kt | 216 +++++++++++
.../org/apache/beam/examples/kotlin/WordCount.kt | 188 ++++++++++
.../kotlin/common/ExampleBigQueryTableOptions.kt | 49 +++
.../beam/examples/kotlin/common/ExampleOptions.kt | 31 +-
.../ExamplePubsubTopicAndSubscriptionOptions.kt | 43 +++
.../kotlin/common/ExamplePubsubTopicOptions.kt | 43 +++
.../beam/examples/kotlin/common/ExampleUtils.kt | 403 +++++++++++++++++++++
.../kotlin/common/WriteOneFilePerWindow.kt | 99 +++++
.../examples/DebuggingWordCountTestKotlin.java | 57 +++
.../beam/examples/MinimalWordCountTestKotlin.java | 91 +++++
.../beam/examples/WindowedWordCountITKotlin.kt | 224 ++++++++++++
.../apache/beam/examples/WordCountITKotlin.java | 72 ++++
examples/kotlin/src/test/resources/LICENSE | 202 +++++++++++
settings.gradle | 2 +
23 files changed, 2194 insertions(+), 18 deletions(-)
diff --git a/.test-infra/jenkins/job_PreCommit_Java.groovy
b/.test-infra/jenkins/job_PreCommit_Java.groovy
index 00a67b4..6d63979 100644
--- a/.test-infra/jenkins/job_PreCommit_Java.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java.groovy
@@ -28,6 +28,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
'^sdks/java/.*$',
'^runners/.*$',
'^examples/java/.*$',
+ '^examples/kotlin/.*$',
'^release/.*$',
]
)
diff --git a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
index e02dd12..7855085 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
@@ -28,6 +28,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
'^sdks/java/.*$',
'^runners/google-cloud-dataflow-java/.*$',
'^examples/java/.*$',
+ '^examples/kotlin/.*$',
'^release/.*$',
]
)
diff --git a/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
b/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
index 5dfc426..37cdd07 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
@@ -28,6 +28,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
'^sdks/java/.*$',
'^runners/google-cloud-dataflow-java/worker.*$',
'^examples/java/.*$',
+ '^examples/kotlin/.*$',
'^release/.*$',
]
)
diff --git a/.test-infra/jenkins/job_PreCommit_Spotless.groovy
b/.test-infra/jenkins/job_PreCommit_Spotless.groovy
index 68eb312..7dc033d 100644
--- a/.test-infra/jenkins/job_PreCommit_Spotless.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Spotless.groovy
@@ -27,6 +27,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
'^sdks/java/.*$',
'^runners/.*$',
'^examples/java/.*$',
+ '^examples/kotlin/.*$',
]
)
builder.build()
diff --git a/examples/OWNERS b/examples/kotlin/OWNERS
similarity index 86%
rename from examples/OWNERS
rename to examples/kotlin/OWNERS
index 3a2b349..3811246 100644
--- a/examples/OWNERS
+++ b/examples/kotlin/OWNERS
@@ -1,4 +1,5 @@
# See the OWNERS docs at https://s.apache.org/beam-owners
reviewers:
+ - lukecwik
- aaltay
diff --git a/examples/kotlin/README.md b/examples/kotlin/README.md
new file mode 100644
index 0000000..a820a36
--- /dev/null
+++ b/examples/kotlin/README.md
@@ -0,0 +1,57 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Example Pipelines
+
+The examples included in this module serve to demonstrate the basic
+functionality of Apache Beam, and act as starting points for
+the development of more complex pipelines.
+
+## Word Count
+
+A good starting point for new users is our set of
+[word
count](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin)
examples, which computes word frequencies. This series of four successively
more detailed pipelines is described in detail in the accompanying
[walkthrough](https://beam.apache.org/get-started/wordcount-example/).
+
+1.
[`MinimalWordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt)
is the simplest word count pipeline and introduces basic concepts like
[Pipelines](https://beam.apache.org/documentation/programming-guide/#pipeline),
+[PCollections](https://beam.apache.org/documentation/programming-guide/#pcollection),
+[ParDo](https://beam.apache.org/documentation/programming-guide/#transforms-pardo),
+and [reading and writing
data](https://beam.apache.org/documentation/programming-guide/#io) from
external storage.
+
+1.
[`WordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt)
introduces best practices like
[PipelineOptions](https://beam.apache.org/documentation/programming-guide/#pipeline)
and custom
[PTransforms](https://beam.apache.org/documentation/programming-guide/#transforms-composite).
+
+1.
[`DebuggingWordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt)
+demonstrates some best practices for instrumenting your pipeline code.
+
+1.
[`WindowedWordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt)
shows how to run the same pipeline over either unbounded PCollections in
streaming mode or bounded PCollections in batch mode.
+
+## Running Examples
+
+See [Apache Beam WordCount
Example](https://beam.apache.org/get-started/wordcount-example/) for
information on running these examples.
+
+## Beyond Word Count [WIP]
+
+After you've finished running your first few word count pipelines, take a look
at the
[cookbook](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook)
+directory for some common and useful patterns like joining, filtering, and
combining.
+
+The
[complete](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/complete)
+directory contains a few realistic end-to-end pipelines.
+
+See the other examples as well. This directory includes a Java 8 version of the
+MinimalWordCount example, as well as a series of examples in a simple 'mobile
+gaming' domain. This series introduces some advanced concepts.
diff --git a/examples/kotlin/build.gradle b/examples/kotlin/build.gradle
new file mode 100644
index 0000000..9d417e3
--- /dev/null
+++ b/examples/kotlin/build.gradle
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module'
+ id 'org.jetbrains.kotlin.jvm' version '1.3.21'
+}
+
+applyJavaNature(exportJavadoc: false)
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: Examples :: Kotlin"
+ext.summary = """Apache Beam SDK provides a simple, Kotlin-based
+interface for processing virtually any size data. This
+artifact includes all Apache Beam Kotlin SDK examples."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+// TODO: Add apexRunner - https://issues.apache.org/jira/browse/BEAM-3583
+def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"]
+for (String runner : preCommitRunners) {
+ configurations.create(runner + "PreCommit")
+}
+configurations.sparkRunnerPreCommit {
+ // Ban certain dependencies to prevent a StackOverflow within Spark
+ // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14
+ exclude group: "org.slf4j", module: "jul-to-slf4j"
+ exclude group: "org.slf4j", module: "slf4j-jdk14"
+}
+
+dependencies {
+ shadow library.java.vendored_guava_20_0
+ shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+ shadow project(path:
":beam-sdks-java-extensions-google-cloud-platform-core", configuration:
"shadow")
+ shadow project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadow")
+ shadow library.java.google_api_client
+ shadow library.java.google_api_services_bigquery
+ shadow library.java.google_http_client
+ shadow library.java.bigdataoss_util
+ shadow library.java.google_auth_library_oauth2_http
+ shadow library.java.google_auth_library_credentials
+ shadow library.java.avro
+ shadow library.java.google_api_services_pubsub
+ shadow library.java.datastore_v1_proto_client
+ shadow library.java.datastore_v1_protos
+ shadow library.java.joda_time
+ shadow library.java.slf4j_api
+ shadow library.java.slf4j_jdk14
+ runtime project(path: ":beam-runners-direct-java", configuration: "shadow")
+ shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform",
configuration: "shadow")
+ shadowTest library.java.hamcrest_core
+ shadowTest library.java.hamcrest_library
+ shadowTest library.java.junit
+ shadowTest library.java.mockito_core
+
+ // Add dependencies for the PreCommit configurations
+ // For each runner a project level dependency on the examples project.
+ for (String runner : preCommitRunners) {
+ delegate.add(runner + "PreCommit", project(path: ":beam-examples-kotlin",
configuration: "shadow"))
+ delegate.add(runner + "PreCommit", project(path: ":beam-examples-kotlin",
configuration: "shadowTest"))
+ }
+ // https://issues.apache.org/jira/browse/BEAM-3583
+ // apexRunnerPreCommit project(path: ":beam-runners-apex", configuration:
"shadow")
+ directRunnerPreCommit project(path: ":beam-runners-direct-java",
configuration: "shadow")
+ flinkRunnerPreCommit project(path: ":beam-runners-flink_2.11",
configuration: "shadow")
+ // TODO: Make the netty version used configurable, we add netty-all
4.1.17.Final so it appears on the classpath
+ // before 4.1.8.Final defined by Apache Beam
+ sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
+ sparkRunnerPreCommit project(path: ":beam-runners-spark", configuration:
"shadow")
+ sparkRunnerPreCommit project(path: ":beam-sdks-java-io-hadoop-file-system",
configuration: "shadow")
+ sparkRunnerPreCommit library.java.spark_streaming
+ sparkRunnerPreCommit library.java.spark_core
+ implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
+}
+
+/*
+ * Create a ${runner}PreCommit task for each runner which runs a set
+ * of integration tests for WordCount and WindowedWordCount.
+ */
+def preCommitRunnerClass = [
+ apexRunner: "org.apache.beam.runners.apex.TestApexRunner",
+ directRunner: "org.apache.beam.runners.direct.DirectRunner",
+ flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner",
+ sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner",
+]
+def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+def gcsTempRoot = project.findProperty('gcsTempRoot') ?:
'gs://temp-storage-for-end-to-end-tests/'
+
+for (String runner : preCommitRunners) {
+ tasks.create(name: runner + "PreCommit", type: Test) {
+ def preCommitBeamTestPipelineOptions = [
+ "--project=${gcpProject}",
+ "--tempRoot=${gcsTempRoot}",
+ "--runner=" + preCommitRunnerClass[runner],
+ ]
+ classpath = configurations."${runner}PreCommit"
+ include "**/WordCountIT.class"
+ if (!"sparkRunner".equals(runner)) {
+ include "**/WindowedWordCountIT.class"
+ }
+ forkEvery 1
+ maxParallelForks 4
+ systemProperty "beamTestPipelineOptions",
JsonOutput.toJson(preCommitBeamTestPipelineOptions)
+ }
+}
+
+/* Define a common precommit task which depends on all the individual
precommits. */
+task preCommit() {
+ for (String runner : preCommitRunners) {
+ dependsOn runner + "PreCommit"
+ }
+}
+
+compileKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+compileTestKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+repositories {
+ mavenCentral()
+}
+
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
new file mode 100644
index 0000000..bd16582
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.metrics.Metrics
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.slf4j.LoggerFactory
+import java.util.regex.Pattern
+
+/**
+ * An example that verifies word counts in Shakespeare and includes Beam best
practices.
+ *
+ *
+ * This class, [DebuggingWordCount], is the third in a series of four
successively more
+ * detailed 'word count' examples. You may first want to take a look at
[MinimalWordCount] and
+ * [WordCount]. After you've looked at this example, then see the
[WindowedWordCount]
+ * pipeline, for introduction of additional concepts.
+ *
+ *
+ * Basic concepts, also in the MinimalWordCount and WordCount examples:
Reading text files;
+ * counting a PCollection; executing a Pipeline both locally and using a
selected runner; defining
+ * DoFns.
+ *
+ *
+ * New Concepts:
+ *
+ * <pre>
+ * 1. Logging using SLF4J, even in a distributed environment
+ * 2. Creating a custom metric (runners have varying levels of support)
+ * 3. Testing your Pipeline via PAssert
+</pre> *
+ *
+ *
+ * To execute this pipeline locally, specify general pipeline configuration:
+ *
+ * <pre>`--project=YOUR_PROJECT_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ *
+ * The input file defaults to a public data set containing the text of of King
Lear, by William
+ * Shakespeare. You can override it and choose your own input with
`--inputFile`.
+ */
+public object DebuggingWordCount {
+ /** A DoFn that filters for a specific key based upon a regular
expression. */
+ public class FilterTextFn(pattern: String) : DoFn<KV<String, Long>,
KV<String, Long>>() {
+
+ private val filter: Pattern = Pattern.compile(pattern)
+
+ /**
+ * Concept #2: A custom metric can track values in your pipeline as it
runs. Each runner
+ * provides varying levels of support for metrics, and may expose them
in a dashboard, etc.
+ */
+ private val matchedWords = Metrics.counter(FilterTextFn::class.java,
"matchedWords")
+
+ private val unmatchedWords = Metrics.counter(FilterTextFn::class.java,
"unmatchedWords")
+
+ @ProcessElement
+ fun processElement(c: ProcessContext) {
+ if (filter.matcher(c.element().key).matches()) {
+ // Log at the "DEBUG" level each element that we match. When
executing this pipeline
+ // these log lines will appear only if the log level is set to
"DEBUG" or lower.
+ LOG.debug("Matched: ${c.element().key}")
+ matchedWords.inc()
+ c.output(c.element())
+ } else {
+ // Log at the "TRACE" level each element that is not matched.
Different log levels
+ // can be used to control the verbosity of logging providing
an effective mechanism
+ // to filter less important information.
+ LOG.trace("Did not match: ${c.element().key}")
+ unmatchedWords.inc()
+ }
+ }
+
+ companion object {
+ /**
+ * Concept #1: The logger below uses the fully qualified class
name of FilterTextFn as the
+ * logger. Depending on your SLF4J configuration, log statements
will likely be qualified by
+ * this name.
+ *
+ *
+ * Note that this is entirely standard SLF4J usage. Some runners
may provide a default SLF4J
+ * configuration that is most appropriate for their logging
integration.
+ */
+ private val LOG = LoggerFactory.getLogger(FilterTextFn::class.java)
+ }
+ }
+
+ /**
+ * Options supported by [DebuggingWordCount].
+ *
+ *
+ * Inherits standard configuration options and all options defined in [ ].
+ */
+ public interface WordCountOptions : WordCount.WordCountOptions {
+
+ @get:Description("Regex filter pattern to use in DebuggingWordCount. "
+ "Only words matching this pattern will be counted.")
+ @get:Default.String("Flourish|stomach")
+ var filterPattern: String
+ }
+
+ @JvmStatic
+ fun runDebuggingWordCount(options: WordCountOptions) {
+ val p = Pipeline.create(options)
+
+ val filteredWords = p.apply("ReadLines",
TextIO.read().from(options.inputFile))
+ .apply(WordCount.CountWords())
+ .apply(ParDo.of(FilterTextFn(options.filterPattern)))
+
+ /*
+ * Concept #3: PAssert is a set of convenient PTransforms in the style of
+ * Hamcrest's collection matchers that can be used when writing Pipeline
level tests
+ * to validate the contents of PCollections. PAssert is best used in unit
tests
+ * with small data sets but is demonstrated here as a teaching tool.
+ *
+ * <p>Below we verify that the set of filtered words matches our expected
counts. Note
+ * that PAssert does not provide any output and that successful completion
of the
+ * Pipeline implies that the expectations were met. Learn more at
+ * https://beam.apache.org/documentation/pipelines/test-your-pipeline/ on
how to test
+ * your Pipeline and see {@link DebuggingWordCountTest} for an example
unit test.
+ */
+ val expectedResults = listOf(KV.of("Flourish", 3L), KV.of("stomach",
1L))
+ PAssert.that(filteredWords).containsInAnyOrder(expectedResults)
+
+ p.run().waitUntilFinish()
+ }
+
+ @JvmStatic
+ fun main(args: Array<String>) {
+ val options = (PipelineOptionsFactory.fromArgs(*args).withValidation()
as WordCountOptions)
+
+ runDebuggingWordCount(options)
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt
new file mode 100644
index 0000000..2c752c4
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.TypeDescriptors
+
+/**
+ * An example that counts words in Shakespeare.
+ *
+ *
+ * This class, [MinimalWordCount], is the first in a series of four
successively more
+ * detailed 'word count' examples. Here, for simplicity, we don't show any
error-checking or
+ * argument processing, and focus on construction of the pipeline, which
chains together the
+ * application of core transforms.
+ *
+ *
+ * Next, see the [WordCount] pipeline, then the [DebuggingWordCount], and
finally the
+ * [WindowedWordCount] pipeline, for more detailed examples that introduce
additional
+ * concepts.
+ *
+ *
+ * Concepts:
+ *
+ * <pre>
+ * 1. Reading data from text files
+ * 2. Specifying 'inline' transforms
+ * 3. Counting items in a PCollection
+ * 4. Writing data to text files
+</pre> *
+ *
+ *
+ * No arguments are required to run this pipeline. It will be executed with
the DirectRunner. You
+ * can see the results in the output files in your current working directory,
with names like
+ * "wordcounts-00001-of-00005. When running on a distributed service, you
would use an appropriate
+ * file service.
+ */
+public object MinimalWordCount {
+
+ @JvmStatic
+ fun main(args: Array<String>) {
+
+ // Create a PipelineOptions object. This object lets us set various
execution
+ // options for our pipeline, such as the runner you wish to use. This
example
+ // will run with the DirectRunner by default, based on the class path
configured
+ // in its dependencies.
+ val options = PipelineOptionsFactory.create()
+
+ // In order to run your pipeline, you need to make following runner
specific changes:
+ //
+ // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
+ // or FlinkRunner.
+ // CHANGE 2/3: Specify runner-required options.
+ // For BlockingDataflowRunner, set project and temp location as
follows:
+ // val dataflowOptions : DataflowPipelineOptions =
options.as(DataflowPipelineOptions::class.java)
+ // dataflowOptions.runner = BlockingDataflowRunner::class.java
+ // dataflowOptions.project = "SET_YOUR_PROJECT_ID_HERE"
+ // dataflowOptions.tempLocation =
"gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"
+ // For FlinkRunner, set the runner as follows. See {@code
FlinkPipelineOptions}
+ // for more details.
+ // options.as(FlinkPipelineOptions::class.java)
+ // .setRunner(FlinkRunner::class.java)
+
+ // Create the Pipeline object with the options we defined above
+ val p = Pipeline.create(options)
+
+ // Concept #1: Apply a root transform to the pipeline; in this case,
TextIO.Read to read a set
+ // of input text files. TextIO.Read returns a PCollection where each
element is one line from
+ // the input text (a set of Shakespeare's texts).
+
+ // This example reads a public data set consisting of the complete
works of Shakespeare.
+
p.apply<PCollection<String>>(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
+
+ // Concept #2: Apply a FlatMapElements transform the
PCollection of text lines.
+ // This transform splits the lines in PCollection<String>,
where each element is an
+ // individual word in Shakespeare's collected texts.
+ .apply(
+ FlatMapElements.into(TypeDescriptors.strings())
+ .via(ProcessFunction<String, List<String>> {
input -> input.split("[^\\p{L}]+").toList() })
+ )
+ // We use a Filter transform to avoid empty word
+ .apply(Filter.by(SerializableFunction<String, Boolean> { input
-> !input.isEmpty() }))
+ // Concept #3: Apply the Count transform to our PCollection of
individual words. The Count
+ // transform returns a new PCollection of key/value pairs,
where each key represents a
+ // unique word in the text. The associated value is the
occurrence count for that word.
+ .apply(Count.perElement<String>())
+ // Apply a MapElements transform that formats our PCollection
of word counts into a
+ // printable string, suitable for writing to an output file.
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(ProcessFunction<KV<String, Long>, String>
{ input -> "${input.key} : ${input.value}" })
+ )
+ // Concept #4: Apply a write transform, TextIO.Write, at the
end of the pipeline.
+ // TextIO.Write writes the contents of a PCollection (in this
case, our PCollection of
+ // formatted strings) to a series of text files.
+ //
+ // By default, it will write to a set of files with names like
wordcounts-00001-of-00005
+ .apply(TextIO.write().to("wordcounts"))
+
+ p.run().waitUntilFinish()
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt
new file mode 100644
index 0000000..f57a62b
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.examples.kotlin.common.ExampleBigQueryTableOptions
+import org.apache.beam.examples.kotlin.common.ExampleOptions
+import org.apache.beam.examples.kotlin.common.WriteOneFilePerWindow
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.MapElements
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.transforms.windowing.FixedWindows
+import org.apache.beam.sdk.transforms.windowing.Window
+import org.apache.beam.sdk.values.PDone
+import org.joda.time.Duration
+import org.joda.time.Instant
+import java.io.IOException
+import java.util.concurrent.ThreadLocalRandom
+
+/**
+ * An example that counts words in text, and can run over either unbounded or
bounded input
+ * collections.
+ *
+ *
+ * This class, [WindowedWordCount], is the last in a series of four
successively more
+ * detailed 'word count' examples. First take a look at [MinimalWordCount],
[WordCount],
+ * and [DebuggingWordCount].
+ *
+ *
+ * Basic concepts, also in the MinimalWordCount, WordCount, and
DebuggingWordCount examples:
+ * Reading text files; counting a PCollection; writing to GCS; executing a
Pipeline both locally and
+ * using a selected runner; defining DoFns; user-defined PTransforms; defining
PipelineOptions.
+ *
+ *
+ * New Concepts:
+ *
+ * <pre>
+ * 1. Unbounded and bounded pipeline input modes
+ * 2. Adding timestamps to data
+ * 3. Windowing
+ * 4. Re-using PTransforms over windowed PCollections
+ * 5. Accessing the window of an element
+ * 6. Writing data to per-window text files
+</pre> *
+ *
+ *
+ * By default, the examples will run with the `DirectRunner`. To change the
runner,
+ * specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/kotlin/README.md for instructions about how to configure
different runners.
+ *
+ *
+ * To execute this pipeline locally, specify a local output file (if using the
`DirectRunner`) or output prefix on a supported distributed file system.
+ *
+ * <pre>`--output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+`</pre> *
+ *
+ *
+ * The input file defaults to a public data set containing the text of of King
Lear, by William
+ * Shakespeare. You can override it and choose your own input with
`--inputFile`.
+ *
+ *
+ * By default, the pipeline will do fixed windowing, on 10-minute windows. You
can change this
+ * interval by setting the `--windowSize` parameter, e.g. `--windowSize=15` for
+ * 15-minute windows.
+ *
+ *
+ * The example will try to cancel the pipeline on the signal to terminate the
process (CTRL-C).
+ */
+public object WindowedWordCount {
+ const val WINDOW_SIZE = 10 // Default window duration in minutes
+
+ /**
+ * Concept #2: A DoFn that sets the data element timestamp. This is a
silly method, just for this
+ * example, for the bounded data case.
+ *
+ *
+ * Imagine that many ghosts of Shakespeare are all typing madly at the
same time to recreate
+ * his masterworks. Each line of the corpus will get a random associated
timestamp somewhere in a
+ * 2-hour period.
+ */
+ public class AddTimestampFn(private val minTimestamp: Instant, private val
maxTimestamp: Instant) : DoFn<String, String>() {
+
+ @ProcessElement
+ fun processElement(@Element element: String, receiver:
DoFn.OutputReceiver<String>) {
+ val randomTimestamp = Instant(
+ ThreadLocalRandom.current()
+ .nextLong(minTimestamp.millis,
maxTimestamp.millis))
+
+ /*
+ * Concept #2: Set the data element with that timestamp.
+ */
+ receiver.outputWithTimestamp(element, Instant(randomTimestamp))
+ }
+ }
+
+ /** A [DefaultValueFactory] that returns the current system time. */
+ public class DefaultToCurrentSystemTime : DefaultValueFactory<Long> {
+ override fun create(options: PipelineOptions): Long? {
+ return System.currentTimeMillis()
+ }
+ }
+
+ /** A [DefaultValueFactory] that returns the minimum timestamp plus one
hour. */
+ public class DefaultToMinTimestampPlusOneHour : DefaultValueFactory<Long> {
+ override fun create(options: PipelineOptions): Long? {
+ return (options as Options).minTimestampMillis!! +
Duration.standardHours(1).millis
+ }
+ }
+
+ /**
+ * Options for [WindowedWordCount].
+ *
+ *
+ * Inherits standard example configuration options, which allow
specification of the runner, as
+ * well as the [WordCount.WordCountOptions] support for specification of
the input and
+ * output files.
+ */
+ public interface Options : WordCount.WordCountOptions, ExampleOptions,
ExampleBigQueryTableOptions {
+ @get:Description("Fixed window duration, in minutes")
+ @get:Default.Integer(WINDOW_SIZE)
+ var windowSize: Int?
+
+ @get:Description("Minimum randomly assigned timestamp, in
milliseconds-since-epoch")
+ @get:Default.InstanceFactory(DefaultToCurrentSystemTime::class)
+ var minTimestampMillis: Long?
+
+ @get:Description("Maximum randomly assigned timestamp, in
milliseconds-since-epoch")
+ @get:Default.InstanceFactory(DefaultToMinTimestampPlusOneHour::class)
+ var maxTimestampMillis: Long?
+
+ @get:Description("Fixed number of shards to produce per window")
+ var numShards: Int?
+ }
+
+ @Throws(IOException::class)
+ @JvmStatic
+ fun runWindowedWordCount(options: Options) {
+ val output = options.output
+ val minTimestamp = Instant(options.minTimestampMillis)
+ val maxTimestamp = Instant(options.maxTimestampMillis)
+
+ val pipeline = Pipeline.create(options)
+
+ /*
+ * Concept #1: the Beam SDK lets us run the same pipeline with either a
bounded or
+ * unbounded input source.
+ */
+ val input = pipeline
+ /* Read from the GCS file. */
+ .apply(TextIO.read().from(options.inputFile))
+ // Concept #2: Add an element timestamp, using an artificial
time just to show
+ // windowing.
+ // See AddTimestampFn for more detail on this.
+ .apply(ParDo.of(AddTimestampFn(minTimestamp, maxTimestamp)))
+
+ /*
+ * Concept #3: Window into fixed windows. The fixed window size for this
example defaults to 1
+ * minute (you can change this with a command-line option). See the
documentation for more
+ * information on how fixed windows work, and for information on the other
types of windowing
+ * available (e.g., sliding windows).
+ */
+ val windowedWords = input.apply(
+
Window.into<String>(FixedWindows.of(Duration.standardMinutes(options.windowSize!!.toLong()))))
+
+ /*
+ * Concept #4: Re-use our existing CountWords transform that does not have
knowledge of
+ * windows over a PCollection containing windowed values.
+ */
+ val wordCounts = windowedWords.apply(WordCount.CountWords())
+
+ /*
+ * Concept #5: Format the results and write to a sharded file partitioned
by window, using a
+ * simple ParDo operation. Because there may be failures followed by
retries, the
+ * writes must be idempotent, but the details of writing to files is
elided here.
+ */
+ wordCounts
+ .apply(MapElements.via(WordCount.FormatAsTextFn()))
+ .apply<PDone>(WriteOneFilePerWindow(output, options.numShards))
+
+ val result = pipeline.run()
+ try {
+ result.waitUntilFinish()
+ } catch (exc: Exception) {
+ result.cancel()
+ }
+
+ }
+
+ @Throws(IOException::class)
+ @JvmStatic
+ fun main(args: Array<String>) {
+ val options = (PipelineOptionsFactory.fromArgs(*args).withValidation()
as Options)
+ runWindowedWordCount(options)
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt
new file mode 100644
index 0000000..9cf6d3f
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.examples.kotlin.common.ExampleUtils
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.metrics.Metrics
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.Validation.Required
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PDone
+
+/**
+ * An example that counts words in Shakespeare and includes Beam best
practices.
+ *
+ *
+ * This class, [WordCount], is the second in a series of four successively
more detailed
+ * 'word count' examples. You may first want to take a look at
[MinimalWordCount]. After
+ * you've looked at this example, then see the [DebuggingWordCount] pipeline,
for introduction
+ * of additional concepts.
+ *
+ *
+ * For a detailed walkthrough of this example, see [
+ * https://beam.apache.org/get-started/wordcount-example/
](https://beam.apache.org/get-started/wordcount-example/)
+ *
+ *
+ * Basic concepts, also in the MinimalWordCount example: Reading text files;
counting a
+ * PCollection; writing to text files
+ *
+ *
+ * New Concepts:
+ *
+ * <pre>
+ * 1. Executing a Pipeline both locally and using the selected runner
+ * 2. Using ParDo with static DoFns defined out-of-line
+ * 3. Building a composite transform
+ * 4. Defining your own pipeline options
+</pre> *
+ *
+ *
+ * Concept #1: you can execute this pipeline either locally or using by
selecting another runner.
+ * These are now command-line options and not hard-coded as they were in the
MinimalWordCount
+ * example.
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ *
+ * To execute this pipeline, specify a local output file (if using the
`DirectRunner`) or
+ * output prefix on a supported distributed file system.
+ *
+ * <pre>`--output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+`</pre> *
+ *
+ *
+ * The input file defaults to a public data set containing the text of of King
Lear, by William
+ * Shakespeare. You can override it and choose your own input with
`--inputFile`.
+ */
+public object WordCount {
+
+ /**
+ * Concept #2: You can make your pipeline assembly code less verbose by
defining your DoFns
+ * statically out-of-line. This DoFn tokenizes lines of text into
individual words; we pass it to
+ * a ParDo in the pipeline.
+ */
+ public class ExtractWordsFn : DoFn<String, String>() {
+ private val emptyLines = Metrics.counter(ExtractWordsFn::class.java,
"emptyLines")
+ private val lineLenDist =
Metrics.distribution(ExtractWordsFn::class.java, "lineLenDistro")
+
+ @ProcessElement
+ fun processElement(@Element element: String, receiver:
DoFn.OutputReceiver<String>) {
+ lineLenDist.update(element.length.toLong())
+ if (element.trim(' ').isEmpty()) {
+ emptyLines.inc()
+ }
+
+ // Split the line into words.
+ val words =
element.split(ExampleUtils.TOKENIZER_PATTERN.toRegex()).toTypedArray()
+
+ // Output each word encountered into the output PCollection.
+ for (word in words) {
+ if (!word.isEmpty()) {
+ receiver.output(word)
+ }
+ }
+ }
+ }
+
+ /** A SimpleFunction that converts a Word and Count into a printable
string. */
+ public class FormatAsTextFn : SimpleFunction<KV<String, Long>, String>() {
+ override fun apply(input: KV<String, Long>): String {
+ return "${input.key} : ${input.value}"
+ }
+ }
+
+ /**
+ * A PTransform that converts a PCollection containing lines of text into
a PCollection of
+ * formatted word counts.
+ *
+ *
+ * Concept #3: This is a custom composite transform that bundles two
transforms (ParDo and
+ * Count) as a reusable PTransform subclass. Using composite transforms
allows for easy reuse,
+ * modular testing, and an improved monitoring experience.
+ */
+ public class CountWords : PTransform<PCollection<String>,
PCollection<KV<String, Long>>>() {
+ override fun expand(lines: PCollection<String>):
PCollection<KV<String, Long>> {
+
+ // Convert lines of text into individual words.
+ val words = lines.apply(ParDo.of(ExtractWordsFn()))
+
+ // Count the number of times each word occurs.
+ val wordCounts = words.apply(Count.perElement())
+
+ return wordCounts
+ }
+ }
+
+ /**
+ * Options supported by [WordCount].
+ *
+ *
+ * Concept #4: Defining your own configuration options. Here, you can add
your own arguments to
+ * be processed by the command-line parser, and specify default values for
them. You can then
+ * access the options values in your pipeline code.
+ *
+ *
+ * Inherits standard configuration options.
+ */
+ public interface WordCountOptions : PipelineOptions {
+
+ /**
+ * By default, this example reads from a public dataset containing the
text of King Lear. Set
+ * this option to choose a different input file or glob.
+ */
+ @get:Description("Path of the file to read from")
+
@get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+ var inputFile: String
+
+ /** Set this required option to specify where to write the output. */
+ @get:Description("Path of the file to write to")
+ @get:Required
+ var output: String
+ }
+
+ @JvmStatic
+ fun runWordCount(options: WordCountOptions) {
+ val p = Pipeline.create(options)
+
+ // Concepts #2 and #3: Our pipeline applies the composite CountWords
transform, and passes the
+ // static FormatAsTextFn() to the ParDo transform.
+ p.apply("ReadLines", TextIO.read().from(options.inputFile))
+ .apply(CountWords())
+ .apply(MapElements.via(FormatAsTextFn()))
+ .apply<PDone>("WriteCounts", TextIO.write().to(options.output))
+
+ p.run().waitUntilFinish()
+ }
+
+ @JvmStatic
+ fun main(args: Array<String>) {
+ val options = (PipelineOptionsFactory.fromArgs(*args).withValidation()
as WordCountOptions)
+ runWordCount(options)
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt
new file mode 100644
index 0000000..176ff7a
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.DefaultValueFactory
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+
+/**
+ * Options that can be used to configure BigQuery tables in Beam examples. The
project defaults to
+ * the project being used to run the example.
+ */
+interface ExampleBigQueryTableOptions : GcpOptions {
+ @get:Description("BigQuery dataset name")
+ @get:Default.String("beam_examples")
+ var bigQueryDataset: String
+
+ @get:Description("BigQuery table name")
+ @get:Default.InstanceFactory(BigQueryTableFactory::class)
+ var bigQueryTable: String
+
+ @get:Description("BigQuery table schema")
+ var bigQuerySchema: TableSchema
+
+ /** Returns the job name as the default BigQuery table name. */
+ class BigQueryTableFactory : DefaultValueFactory<String> {
+ override fun create(options: PipelineOptions): String {
+ return options.jobName.replace('-', '_')
+ }
+ }
+}
diff --git a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleOptions.kt
similarity index 57%
copy from .test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
copy to
examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleOptions.kt
index e02dd12..f8bb4a1 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleOptions.kt
@@ -15,24 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.beam.examples.kotlin.common
-import PrecommitJobBuilder
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
-PrecommitJobBuilder builder = new PrecommitJobBuilder(
- scope: this,
- nameBase: 'Java_Examples_Dataflow',
- gradleTask: ':javaExamplesDataflowPreCommit',
- gradleSwitches: ['-PdisableSpotlessCheck=true'], // spotless checked in
separate pre-commit
- triggerPathPatterns: [
- '^model/.*$',
- '^sdks/java/.*$',
- '^runners/google-cloud-dataflow-java/.*$',
- '^examples/java/.*$',
- '^release/.*$',
- ]
-)
-builder.build {
- publishers {
- archiveJunit('**/build/test-results/**/*.xml')
- }
+/** Options that can be used to configure the Beam examples. */
+interface ExampleOptions : PipelineOptions {
+ @get:Description("Whether to keep jobs running after local process exit")
+ @get:Default.Boolean(false)
+ var keepJobsRunning: Boolean
+
+ @get:Description("Number of workers to use when executing the injector
pipeline")
+ @get:Default.Integer(1)
+ var injectorNumWorkers: Int
}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicAndSubscriptionOptions.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicAndSubscriptionOptions.kt
new file mode 100644
index 0000000..1050f47
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicAndSubscriptionOptions.kt
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.DefaultValueFactory
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+
+/** Options that can be used to configure Pub/Sub topic/subscription in Beam
examples. */
+interface ExamplePubsubTopicAndSubscriptionOptions : ExamplePubsubTopicOptions
{
+ @get:Description("Pub/Sub subscription")
+ @get:Default.InstanceFactory(PubsubSubscriptionFactory::class)
+ var pubsubSubscription: String
+
+ /** Returns a default Pub/Sub subscription based on the project and the
job names. */
+ class PubsubSubscriptionFactory : DefaultValueFactory<String> {
+ override fun create(options: PipelineOptions): String {
+ return """
+ projects/
+ ${(options as GcpOptions).project}
+ /subscriptions/
+ ${options.jobName}
+ """.trimIndent()
+ }
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicOptions.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicOptions.kt
new file mode 100644
index 0000000..341aae5
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicOptions.kt
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.DefaultValueFactory
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+
+/** Options that can be used to configure Pub/Sub topic in Beam examples. */
+interface ExamplePubsubTopicOptions : GcpOptions {
+ @get:Description("Pub/Sub topic")
+ @get:Default.InstanceFactory(PubsubTopicFactory::class)
+ var pubsubTopic: String
+
+ /** Returns a default Pub/Sub topic based on the project and the job
names. */
+ class PubsubTopicFactory : DefaultValueFactory<String> {
+ override fun create(options: PipelineOptions): String {
+ return """
+ projects/"
+ ${(options as GcpOptions).project}
+ /topics/
+ ${options.jobName}
+ """.trimIndent()
+ }
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleUtils.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleUtils.kt
new file mode 100644
index 0000000..8144a11
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleUtils.kt
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest
+import com.google.api.client.http.HttpRequestInitializer
+import com.google.api.services.bigquery.Bigquery
+import com.google.api.services.bigquery.model.*
+import com.google.api.services.pubsub.Pubsub
+import com.google.api.services.pubsub.model.Subscription
+import com.google.api.services.pubsub.model.Topic
+import com.google.auth.Credentials
+import com.google.auth.http.HttpCredentialsAdapter
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer
+import org.apache.beam.sdk.PipelineResult
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer
+import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer
+import org.apache.beam.sdk.extensions.gcp.util.Transport
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
+import org.apache.beam.sdk.options.PipelineOptions
+import org.apache.beam.sdk.util.BackOffUtils
+import org.apache.beam.sdk.util.FluentBackoff
+import org.apache.beam.sdk.util.Sleeper
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles
+import org.joda.time.Duration
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+
+/**
+ * The utility class that sets up and tears down external resources, and
cancels the streaming
+ * pipelines once the program terminates.
+ *
+ *
+ * It is used to run Beam examples.
+ */
+class ExampleUtils
+/** Do resources and runner options setup. */
+(private val options: PipelineOptions) {
+ private val bigQueryClient: Bigquery by lazy {
+ newBigQueryClient(options as BigQueryOptions).build()
+ }
+ private val pubsubClient: Pubsub by lazy {
+ newPubsubClient(options as PubsubOptions).build()
+ }
+ private val pipelinesToCancel = Sets.newHashSet<PipelineResult>()
+ private val pendingMessages = Lists.newArrayList<String>()
+
+ /**
+ * Sets up external resources that are required by the example, such as
Pub/Sub topics and
+ * BigQuery tables.
+ *
+ * @throws IOException if there is a problem setting up the resources
+ */
+ @Throws(IOException::class)
+ fun setup() {
+ val sleeper = Sleeper.DEFAULT
+ val backOff =
FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff()
+ var lastException: Throwable? = null
+ try {
+ do {
+ try {
+ setupPubsub()
+ setupBigQueryTable()
+ return
+ } catch (e: GoogleJsonResponseException) {
+ lastException = e
+ }
+
+ } while (BackOffUtils.next(sleeper, backOff))
+ } catch (e: InterruptedException) {
+ Thread.currentThread().interrupt()
+ // Ignore InterruptedException
+ }
+
+ throw RuntimeException(lastException)
+ }
+
+ /**
+ * Sets up the Google Cloud Pub/Sub topic.
+ *
+ *
+ * If the topic doesn't exist, a new topic with the given name will be
created.
+ *
+ * @throws IOException if there is a problem setting up the Pub/Sub topic
+ */
+ @Throws(IOException::class)
+ fun setupPubsub() {
+ val pubsubOptions = options as ExamplePubsubTopicAndSubscriptionOptions
+ if (pubsubOptions.pubsubTopic.isNotEmpty()) {
+ pendingMessages.add("**********************Set Up
Pubsub************************")
+ setupPubsubTopic(pubsubOptions.pubsubTopic)
+ pendingMessages.add(
+ "The Pub/Sub topic has been set up for this example:
${pubsubOptions.pubsubTopic}")
+
+ if (pubsubOptions.pubsubSubscription.isNotEmpty()) {
+ setupPubsubSubscription(
+ pubsubOptions.pubsubTopic,
pubsubOptions.pubsubSubscription)
+ pendingMessages.add(
+ "The Pub/Sub subscription has been set up for this
example: ${pubsubOptions.pubsubSubscription}")
+ }
+ }
+ }
+
+ /**
+ * Sets up the BigQuery table with the given schema.
+ *
+ *
+ * If the table already exists, the schema has to match the given one.
Otherwise, the example
+ * will throw a RuntimeException. If the table doesn't exist, a new table
with the given schema
+ * will be created.
+ *
+ * @throws IOException if there is a problem setting up the BigQuery table
+ */
+ @Throws(IOException::class)
+ fun setupBigQueryTable() {
+ val bigQueryTableOptions = options as ExampleBigQueryTableOptions
+ if (bigQueryTableOptions.bigQueryDataset != null
+ && bigQueryTableOptions.bigQueryTable != null
+ && bigQueryTableOptions.bigQuerySchema != null) {
+ pendingMessages.add("******************Set Up Big Query
Table*******************")
+ setupBigQueryTable(
+ bigQueryTableOptions.project,
+ bigQueryTableOptions.bigQueryDataset,
+ bigQueryTableOptions.bigQueryTable,
+ bigQueryTableOptions.bigQuerySchema)
+ pendingMessages.add(
+ """
+ The BigQuery table has been set up for this example:
+ ${bigQueryTableOptions.project}:
+ ${bigQueryTableOptions.bigQueryDataset}.
+ ${bigQueryTableOptions.bigQueryTable}
+ """.trimIndent()
+ )
+ }
+ }
+
+ /** Tears down external resources that can be deleted upon the example's
completion. */
+ private fun tearDown() {
+ pendingMessages.add("*************************Tear
Down*************************")
+ val pubsubOptions = options as ExamplePubsubTopicAndSubscriptionOptions
+ if (pubsubOptions.pubsubTopic.isNotEmpty()) {
+ try {
+ deletePubsubTopic(pubsubOptions.pubsubTopic)
+ pendingMessages.add(
+ "The Pub/Sub topic has been deleted:
${pubsubOptions.pubsubTopic}")
+ } catch (e: IOException) {
+ pendingMessages.add(
+ "Failed to delete the Pub/Sub topic :
${pubsubOptions.pubsubTopic}")
+ }
+
+ if (pubsubOptions.pubsubSubscription.isNotEmpty()) {
+ try {
+ deletePubsubSubscription(pubsubOptions.pubsubSubscription)
+ pendingMessages.add(
+ "The Pub/Sub subscription has been deleted:
${pubsubOptions.pubsubSubscription}")
+ } catch (e: IOException) {
+ pendingMessages.add(
+ "Failed to delete the Pub/Sub subscription :
${pubsubOptions.pubsubSubscription}")
+ }
+
+ }
+ }
+
+ val bigQueryTableOptions = options as ExampleBigQueryTableOptions
+ if (bigQueryTableOptions.bigQueryDataset != null
+ && bigQueryTableOptions.bigQueryTable != null
+ && bigQueryTableOptions.bigQuerySchema != null) {
+ pendingMessages.add(
+ """
+ The BigQuery table might contain the example's output,
and it is not deleted automatically:
+ ${bigQueryTableOptions.project}:
+ ${bigQueryTableOptions.bigQueryDataset}.
+ ${bigQueryTableOptions.bigQueryTable}
+ """.trimIndent())
+ pendingMessages.add(
+ "Please go to the Developers Console to delete it
manually. Otherwise, you may be charged for its usage.")
+ }
+ }
+
+ @Throws(IOException::class)
+ private fun setupBigQueryTable(
+ projectId: String, datasetId: String, tableId: String, schema:
TableSchema) {
+
+ val datasetService = bigQueryClient.datasets()
+ if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) ==
null) {
+ val newDataset = Dataset()
+ .setDatasetReference(
+
DatasetReference().setProjectId(projectId).setDatasetId(datasetId))
+ datasetService.insert(projectId, newDataset).execute()
+ }
+
+ val tableService = bigQueryClient.tables()
+ val table = executeNullIfNotFound(tableService.get(projectId,
datasetId, tableId))
+ if (table == null) {
+ val newTable = Table()
+ .setSchema(schema)
+ .setTableReference(
+ TableReference()
+ .setProjectId(projectId)
+ .setDatasetId(datasetId)
+ .setTableId(tableId))
+ tableService.insert(projectId, datasetId, newTable).execute()
+ } else if (table.schema != schema) {
+ throw RuntimeException(
+ """
+ Table exists and schemas do not match, expecting:
+ ${schema.toPrettyString()}, actual:
+ ${table.schema.toPrettyString()}
+ """.trimIndent()
+ )
+ }
+ }
+
+ @Throws(IOException::class)
+ private fun setupPubsubTopic(topic: String) {
+ pubsubClient.projects().topics().create(topic,
Topic().setName(topic)).execute()
+ }
+
+ @Throws(IOException::class)
+ private fun setupPubsubSubscription(topic: String, subscription: String) {
+ val subInfo = Subscription().setAckDeadlineSeconds(60).setTopic(topic)
+ pubsubClient.projects().subscriptions().create(subscription,
subInfo).execute()
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub topic.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub topic
+ */
+ @Throws(IOException::class)
+ private fun deletePubsubTopic(topic: String) {
+ if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic))
!= null) {
+ pubsubClient.projects().topics().delete(topic).execute()
+ }
+ }
+
+ /**
+ * Deletes the Google Cloud Pub/Sub subscription.
+ *
+ * @throws IOException if there is a problem deleting the Pub/Sub
subscription
+ */
+ @Throws(IOException::class)
+ private fun deletePubsubSubscription(subscription: String) {
+ if
(executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription))
!= null) {
+
pubsubClient.projects().subscriptions().delete(subscription).execute()
+ }
+ }
+
+ /** Waits for the pipeline to finish and cancels it before the program
exists. */
+ fun waitToFinish(result: PipelineResult) {
+ pipelinesToCancel.add(result)
+ if (!(options as ExampleOptions).keepJobsRunning) {
+ addShutdownHook(pipelinesToCancel)
+ }
+ try {
+ result.waitUntilFinish()
+ } catch (e: UnsupportedOperationException) {
+ // Do nothing if the given PipelineResult doesn't support
waitUntilFinish(),
+ // such as EvaluationResults returned by DirectRunner.
+ tearDown()
+ printPendingMessages()
+ } catch (e: Exception) {
+ throw RuntimeException("Failed to wait the pipeline until finish:
$result")
+ }
+
+ }
+
+ private fun addShutdownHook(pipelineResults: Collection<PipelineResult>) {
+ Runtime.getRuntime()
+ .addShutdownHook(
+ Thread {
+ tearDown()
+ printPendingMessages()
+ for (pipelineResult in pipelineResults) {
+ try {
+ pipelineResult.cancel()
+ } catch (e: IOException) {
+ println("Failed to cancel the job.")
+ println(e.message)
+ }
+
+ }
+
+ for (pipelineResult in pipelineResults) {
+ var cancellationVerified = false
+ for (retryAttempts in 6 downTo 1) {
+ if (pipelineResult.state.isTerminal) {
+ cancellationVerified = true
+ break
+ } else {
+ println(
+ "The example pipeline is still
running. Verifying the cancellation.")
+ }
+ Uninterruptibles.sleepUninterruptibly(10,
TimeUnit.SECONDS)
+ }
+ if (!cancellationVerified) {
+ println(
+ "Failed to verify the cancellation
for job: $pipelineResult")
+ }
+ }
+ })
+ }
+
+ private fun printPendingMessages() {
+ println()
+ println("***********************************************************")
+ println("***********************************************************")
+ for (message in pendingMessages) {
+ println(message)
+ }
+ println("***********************************************************")
+ println("***********************************************************")
+ }
+
+ companion object {
+
+ private const val SC_NOT_FOUND = 404
+
+ /**
+ * \p{L} denotes the category of Unicode letters, so this pattern will
match on everything that is
+ * not a letter.
+ *
+ *
+ * It is used for tokenizing strings in the wordcount examples.
+ */
+ const val TOKENIZER_PATTERN = "[^\\p{L}]+"
+
+ /** Returns a BigQuery client builder using the specified
[BigQueryOptions]. */
+ private fun newBigQueryClient(options: BigQueryOptions):
Bigquery.Builder {
+ return Bigquery.Builder(
+ Transport.getTransport(),
+ Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.gcpCredential,
+ // Do not log 404. It clutters the output and is
possibly even required by the
+ // caller.
+
RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.appName)
+ .setGoogleClientRequestInitializer(options.googleApiTrace)
+ }
+
+ /** Returns a Pubsub client builder using the specified
[PubsubOptions]. */
+ private fun newPubsubClient(options: PubsubOptions): Pubsub.Builder {
+ return Pubsub.Builder(
+ Transport.getTransport(),
+ Transport.getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.gcpCredential,
+ // Do not log 404. It clutters the output and is
possibly even required by the
+ // caller.
+
RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setRootUrl(options.pubsubRootUrl)
+ .setApplicationName(options.appName)
+ .setGoogleClientRequestInitializer(options.googleApiTrace)
+ }
+
+ private fun chainHttpRequestInitializer(
+ credential: Credentials?, httpRequestInitializer:
HttpRequestInitializer): HttpRequestInitializer {
+
+ return credential?.let {
+ ChainingHttpRequestInitializer(
+ HttpCredentialsAdapter(credential),
httpRequestInitializer)
+ } ?: kotlin.run {
+ ChainingHttpRequestInitializer(
+ NullCredentialInitializer(), httpRequestInitializer)
+ }
+ }
+
+ @Throws(IOException::class)
+ private fun <T> executeNullIfNotFound(request:
AbstractGoogleClientRequest<T>): T? {
+ return try {
+ request.execute()
+ } catch (e: GoogleJsonResponseException) {
+ if (e.statusCode == SC_NOT_FOUND) {
+ null
+ } else {
+ throw e
+ }
+ }
+
+ }
+ }
+}
diff --git
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt
new file mode 100644
index 0000000..1e12a77
--- /dev/null
+++
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import org.apache.beam.sdk.io.FileBasedSink
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
+import org.apache.beam.sdk.io.fs.ResourceId
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow
+import org.apache.beam.sdk.transforms.windowing.PaneInfo
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PDone
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull
+import org.joda.time.format.ISODateTimeFormat
+
+/**
+ * A [DoFn] that writes elements to files with names deterministically derived
from the lower
+ * and upper bounds of their key (an [IntervalWindow]).
+ *
+ *
+ * This is test utility code, not for end-users, so examples can be focused on
their primary
+ * lessons.
+ */
+class WriteOneFilePerWindow(private val filenamePrefix: String, private val
numShards: Int?) : PTransform<PCollection<String>, PDone>() {
+
+ override fun expand(input: PCollection<String>): PDone {
+ val resource =
FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)
+ var write = TextIO.write()
+ .to(PerWindowFiles(resource))
+ .withTempDirectory(resource.currentDirectory)
+ .withWindowedWrites()
+ if (numShards != null) {
+ write = write.withNumShards(numShards)
+ }
+ return input.apply(write)
+ }
+
+ /**
+ * A [FilenamePolicy] produces a base file name for a write based on
metadata about the data
+ * being written. This always includes the shard number and the total
number of shards. For
+ * windowed writes, it also includes the window and pane index (a sequence
number assigned to each
+ * trigger firing).
+ */
+ class PerWindowFiles(private val baseFilename: ResourceId) :
FilenamePolicy() {
+
+ fun filenamePrefixForWindow(window: IntervalWindow): String {
+ val prefix = if (baseFilename.isDirectory) "" else
firstNonNull(baseFilename.filename, "")
+ return String.format(
+ "%s-%s-%s", prefix, FORMATTER.print(window.start()),
FORMATTER.print(window.end()))
+ }
+
+ override fun windowedFilename(
+ shardNumber: Int,
+ numShards: Int,
+ window: BoundedWindow,
+ paneInfo: PaneInfo,
+ outputFileHints: OutputFileHints): ResourceId {
+ val intervalWindow = window as IntervalWindow
+ val filename = String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(intervalWindow),
+ shardNumber,
+ numShards,
+ outputFileHints.suggestedFilenameSuffix)
+ return baseFilename
+ .currentDirectory
+ .resolve(filename, StandardResolveOptions.RESOLVE_FILE)
+ }
+
+ override fun unwindowedFilename(
+ shardNumber: Int, numShards: Int, outputFileHints:
OutputFileHints): ResourceId? {
+ throw UnsupportedOperationException("Unsupported.")
+ }
+ }
+
+ companion object {
+ private val FORMATTER = ISODateTimeFormat.hourMinute()
+ }
+}
diff --git
a/examples/kotlin/src/test/java/org/apache/beam/examples/DebuggingWordCountTestKotlin.java
b/examples/kotlin/src/test/java/org/apache/beam/examples/DebuggingWordCountTestKotlin.java
new file mode 100644
index 0000000..4da3b35
--- /dev/null
+++
b/examples/kotlin/src/test/java/org/apache/beam/examples/DebuggingWordCountTestKotlin.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.examples.kotlin.DebuggingWordCount;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Files;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link org.apache.beam.examples.kotlin.DebuggingWordCount}. */
+@RunWith(JUnit4.class)
+public class DebuggingWordCountTestKotlin {
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ private String getFilePath(String filePath) {
+ if (filePath.contains(":")) {
+ return filePath.replace("\\", "/").split(":", -1)[1];
+ }
+ return filePath;
+ }
+
+ @Test
+ public void testDebuggingWordCount() throws Exception {
+ File inputFile = tmpFolder.newFile();
+ File outputFile = tmpFolder.newFile();
+ Files.write(
+ "stomach secret Flourish message Flourish here Flourish",
+ inputFile,
+ StandardCharsets.UTF_8);
+ DebuggingWordCount.WordCountOptions options =
+
TestPipeline.testingPipelineOptions().as(DebuggingWordCount.WordCountOptions.class);
+ options.setInputFile(getFilePath(inputFile.getAbsolutePath()));
+ options.setOutput(getFilePath(outputFile.getAbsolutePath()));
+ DebuggingWordCount.runDebuggingWordCount(options);
+ }
+}
diff --git
a/examples/kotlin/src/test/java/org/apache/beam/examples/MinimalWordCountTestKotlin.java
b/examples/kotlin/src/test/java/org/apache/beam/examples/MinimalWordCountTestKotlin.java
new file mode 100644
index 0000000..8c029ac
--- /dev/null
+++
b/examples/kotlin/src/test/java/org/apache/beam/examples/MinimalWordCountTestKotlin.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * To keep {@link org.apache.beam.examples.kotlin.MinimalWordCount} simple, it
is not factored or
+ * testable. This test file should be maintained with a copy of its code for a
basic smoke test.
+ */
+@RunWith(JUnit4.class)
+public class MinimalWordCountTestKotlin implements Serializable {
+
+ @Rule public TestPipeline p =
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ /** A basic smoke test that ensures there is no crash at pipeline
construction time. */
+ @Test
+ public void testMinimalWordCount() throws Exception {
+ p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
+
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
+ .apply(
+ FlatMapElements.into(TypeDescriptors.strings())
+ .via((String word) ->
Arrays.asList(word.split("[^a-zA-Z']+"))))
+ .apply(Filter.by((String word) -> !word.isEmpty()))
+ .apply(Count.perElement())
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(
+ (KV<String, Long> wordCount) ->
+ wordCount.getKey() + ": " + wordCount.getValue()))
+ .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
+ }
+
+ private GcsUtil buildMockGcsUtil() throws IOException {
+ GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+ // Any request to open gets a new bogus channel
+ Mockito.when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+ .then(
+ invocation ->
+ FileChannel.open(
+ Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.DELETE_ON_CLOSE));
+
+ // Any request for expansion returns a list containing the original GcsPath
+ // This is required to pass validation that occurs in TextIO during apply()
+ Mockito.when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+ .then(invocation -> ImmutableList.of((GcsPath)
invocation.getArguments()[0]));
+
+ return mockGcsUtil;
+ }
+}
diff --git
a/examples/kotlin/src/test/java/org/apache/beam/examples/WindowedWordCountITKotlin.kt
b/examples/kotlin/src/test/java/org/apache/beam/examples/WindowedWordCountITKotlin.kt
new file mode 100644
index 0000000..8d25bd4
--- /dev/null
+++
b/examples/kotlin/src/test/java/org/apache/beam/examples/WindowedWordCountITKotlin.kt
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples
+
+import org.apache.beam.examples.kotlin.WindowedWordCount
+import org.apache.beam.examples.kotlin.common.ExampleUtils
+import
org.apache.beam.examples.kotlin.common.WriteOneFilePerWindow.PerWindowFiles
+import org.apache.beam.sdk.PipelineResult
+import org.apache.beam.sdk.io.FileBasedSink
+import org.apache.beam.sdk.io.FileSystems
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.StreamingOptions
+import org.apache.beam.sdk.testing.SerializableMatcher
+import org.apache.beam.sdk.testing.StreamingIT
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.TestPipelineOptions
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow
+import org.apache.beam.sdk.util.*
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists
+import org.hamcrest.Description
+import org.hamcrest.Matchers.equalTo
+import org.hamcrest.TypeSafeMatcher
+import org.joda.time.Duration
+import org.joda.time.Instant
+import org.junit.BeforeClass
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.rules.TestName
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.util.*
+import java.util.concurrent.ThreadLocalRandom
+
+/** End-to-end integration test of [WindowedWordCount]. */
+@RunWith(JUnit4::class)
+class WindowedWordCountITKotlin {
+
+ @Rule
+ var testName = TestName()
+
+ /** Options for the [WindowedWordCount] Integration Test. */
+ interface WindowedWordCountITOptions : WindowedWordCount.Options,
TestPipelineOptions, StreamingOptions
+
+ @Test
+ @Throws(Exception::class)
+ fun testWindowedWordCountInBatchDynamicSharding() {
+ val options = batchOptions()
+ // This is the default value, but make it explicit.
+ options.numShards = null
+ testWindowedWordCountPipeline(options)
+ }
+
+ @Test
+ @Throws(Exception::class)
+ fun testWindowedWordCountInBatchStaticSharding() {
+ val options = batchOptions()
+ options.numShards = 3
+ testWindowedWordCountPipeline(options)
+ }
+
+ // TODO: add a test with streaming and dynamic sharding after resolving
+ // https://issues.apache.org/jira/browse/BEAM-1438
+
+ @Test
+ @Category(StreamingIT::class)
+ @Throws(Exception::class)
+ fun testWindowedWordCountInStreamingStaticSharding() {
+ val options = streamingOptions()
+ options.numShards = 3
+ testWindowedWordCountPipeline(options)
+ }
+
+ @Throws(Exception::class)
+ private fun defaultOptions(): WindowedWordCountITOptions {
+ val options =
TestPipeline.testingPipelineOptions().`as`(WindowedWordCountITOptions::class.java)
+ options.inputFile = DEFAULT_INPUT
+ options.testTimeoutSeconds = 1200L
+
+ options.minTimestampMillis = 0L
+ options.minTimestampMillis = Duration.standardHours(1).millis
+ options.windowSize = 10
+
+ options.output = FileSystems.matchNewResource(options.tempRoot, true)
+ .resolve(
+ String.format(
+
"WindowedWordCountITKotlin.%s-%tFT%<tH:%<tM:%<tS.%<tL+%s",
+ testName.methodName, Date(),
ThreadLocalRandom.current().nextInt()),
+ StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("results", StandardResolveOptions.RESOLVE_FILE)
+ .toString()
+ return options
+ }
+
+ @Throws(Exception::class)
+ private fun streamingOptions(): WindowedWordCountITOptions {
+ val options = defaultOptions()
+ options.isStreaming = true
+ return options
+ }
+
+ @Throws(Exception::class)
+ private fun batchOptions(): WindowedWordCountITOptions {
+ val options = defaultOptions()
+ // This is the default value, but make it explicit
+ options.isStreaming = false
+ return options
+ }
+
+ @Throws(Exception::class)
+ private fun testWindowedWordCountPipeline(options:
WindowedWordCountITOptions) {
+
+ val output =
FileBasedSink.convertToFileResourceIfPossible(options.output)
+ val filenamePolicy = PerWindowFiles(output)
+
+ val expectedOutputFiles =
Lists.newArrayListWithCapacity<ShardedFile>(6)
+
+ for (startMinute in ImmutableList.of(0, 10, 20, 30, 40, 50)) {
+ val windowStart =
Instant(options.minTimestampMillis).plus(Duration.standardMinutes(startMinute.toLong()))
+ val filePrefix = filenamePolicy.filenamePrefixForWindow(
+ IntervalWindow(windowStart,
windowStart.plus(Duration.standardMinutes(10))))
+ expectedOutputFiles.add(
+ NumberedShardedFile(
+ output
+ .currentDirectory
+ .resolve(filePrefix,
StandardResolveOptions.RESOLVE_FILE)
+ .toString() + "*"))
+ }
+
+ val inputFile = ExplicitShardedFile(setOf(options.inputFile))
+
+ // For this integration test, input is tiny and we can build the
expected counts
+ val expectedWordCounts = TreeMap<String, Long>()
+ for (line in inputFile.readFilesWithRetries(Sleeper.DEFAULT,
BACK_OFF_FACTORY.backoff())) {
+ val words =
line.split(ExampleUtils.TOKENIZER_PATTERN.toRegex()).toTypedArray()
+
+ for (word in words) {
+ if (word.isNotEmpty()) {
+ expectedWordCounts[word] =
MoreObjects.firstNonNull(expectedWordCounts[word], 0L) + 1L
+ }
+ }
+ }
+
+ options.onSuccessMatcher = WordCountsMatcher(expectedWordCounts,
expectedOutputFiles)
+
+ WindowedWordCount.runWindowedWordCount(options)
+ }
+
+ /**
+ * A matcher that bakes in expected word counts, so they can be read
directly via some other
+ * mechanism, and compares a sharded output file with the result.
+ */
+ private class WordCountsMatcher(
+ private val expectedWordCounts: SortedMap<String, Long>, private
val outputFiles: List<ShardedFile>) : TypeSafeMatcher<PipelineResult>(),
SerializableMatcher<PipelineResult> {
+ private var actualCounts: SortedMap<String, Long>? = null
+
+ public override fun matchesSafely(pipelineResult: PipelineResult):
Boolean {
+ try {
+ // Load output data
+ val outputLines = ArrayList<String>()
+ for (outputFile in outputFiles) {
+ outputLines.addAll(
+ outputFile.readFilesWithRetries(Sleeper.DEFAULT,
BACK_OFF_FACTORY.backoff()))
+ }
+
+ // Since the windowing is nondeterministic we only check the
sums
+ actualCounts = TreeMap()
+ for (line in outputLines) {
+ val splits = line.split(": ".toRegex()).toTypedArray()
+ val word = splits[0]
+ val count = java.lang.Long.parseLong(splits[1])
+ (actualCounts as java.util.Map<String, Long>).merge(word,
count) { a, b -> a + b }
+ }
+ return actualCounts == expectedWordCounts
+ } catch (e: Exception) {
+ throw RuntimeException(
+ String.format("Failed to read from sharded output: %s
due to exception", outputFiles), e)
+ }
+
+ }
+
+ override fun describeTo(description: Description) {
+ equalTo(expectedWordCounts).describeTo(description)
+ }
+
+ public override fun describeMismatchSafely(pResult: PipelineResult,
description: Description) {
+ equalTo(expectedWordCounts).describeMismatch(actualCounts,
description)
+ }
+ }
+
+ companion object {
+
+ private const val DEFAULT_INPUT =
"gs://apache-beam-samples/shakespeare/sonnets.txt"
+ private const val MAX_READ_RETRIES = 4
+ private val DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L)
+ internal val BACK_OFF_FACTORY = FluentBackoff.DEFAULT
+ .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+ .withMaxRetries(MAX_READ_RETRIES)
+
+ @BeforeClass
+ fun setUp() {
+ PipelineOptionsFactory.register(TestPipelineOptions::class.java)
+ }
+ }
+}
diff --git
a/examples/kotlin/src/test/java/org/apache/beam/examples/WordCountITKotlin.java
b/examples/kotlin/src/test/java/org/apache/beam/examples/WordCountITKotlin.java
new file mode 100644
index 0000000..ef016b9
--- /dev/null
+++
b/examples/kotlin/src/test/java/org/apache/beam/examples/WordCountITKotlin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.Date;
+import org.apache.beam.examples.kotlin.WordCount;
+import org.apache.beam.examples.kotlin.WordCount.WordCountOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end tests of WordCount. */
+@RunWith(JUnit4.class)
+public class WordCountITKotlin {
+ private static final String DEFAULT_INPUT =
+ "gs://apache-beam-samples/shakespeare/winterstale-personae";
+ private static final String DEFAULT_OUTPUT_CHECKSUM =
"ebf895e7324e8a3edc72e7bcc96fa2ba7f690def";
+
+ /**
+ * Options for the WordCount Integration Test.
+ *
+ * <p>Define expected output file checksum to verify WordCount pipeline
result with customized
+ * input.
+ */
+ public interface WordCountITOptions extends TestPipelineOptions,
WordCountOptions {}
+
+ @BeforeClass
+ public static void setUp() {
+ PipelineOptionsFactory.register(TestPipelineOptions.class);
+ }
+
+ @Test
+ public void testE2EWordCount() throws Exception {
+ WordCountITOptions options =
TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
+
+ options.setInputFile(DEFAULT_INPUT);
+ options.setOutput(
+ FileSystems.matchNewResource(options.getTempRoot(), true)
+ .resolve(
+ String.format("WordCountITKotlin-%tF-%<tH-%<tM-%<tS-%<tL", new
Date()),
+ StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("results", StandardResolveOptions.RESOLVE_FILE)
+ .toString());
+ options.setOnSuccessMatcher(
+ new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() +
"*-of-*"));
+
+ WordCount.runWordCount(options);
+ }
+}
diff --git a/examples/kotlin/src/test/resources/LICENSE
b/examples/kotlin/src/test/resources/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/examples/kotlin/src/test/resources/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/settings.gradle b/settings.gradle
index b974ae1..2aa3188 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,6 +20,8 @@ rootProject.name = "beam"
include ":release"
+include "beam-examples-kotlin"
+project(":beam-examples-kotlin").dir = file("examples/kotlin")
include "beam-examples-java"
project(":beam-examples-java").dir = file("examples/java")
include "beam-model-fn-execution"