This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 82b6afa  [BEAM-6493] Convert the WordCount samples to Kotlin (#8291)
82b6afa is described below

commit 82b6afaf363355c7b96d710f9e071443eb4b2bf6
Author: Harshit Dwivedi <[email protected]>
AuthorDate: Sat Apr 13 00:16:04 2019 +0530

    [BEAM-6493] Convert the WordCount samples to Kotlin (#8291)
    
    [BEAM-6493] Convert the WordCount samples to Kotlin
---
 .test-infra/jenkins/job_PreCommit_Java.groovy      |   1 +
 .../job_PreCommit_Java_Examples_Dataflow.groovy    |   1 +
 .../job_PreCommit_Java_PortabilityApi.groovy       |   1 +
 .test-infra/jenkins/job_PreCommit_Spotless.groovy  |   1 +
 examples/{ => kotlin}/OWNERS                       |   1 +
 examples/kotlin/README.md                          |  57 +++
 examples/kotlin/build.gradle                       | 146 ++++++++
 .../beam/examples/kotlin/DebuggingWordCount.kt     | 162 +++++++++
 .../beam/examples/kotlin/MinimalWordCount.kt       | 122 +++++++
 .../beam/examples/kotlin/WindowedWordCount.kt      | 216 +++++++++++
 .../org/apache/beam/examples/kotlin/WordCount.kt   | 188 ++++++++++
 .../kotlin/common/ExampleBigQueryTableOptions.kt   |  49 +++
 .../beam/examples/kotlin/common/ExampleOptions.kt  |  31 +-
 .../ExamplePubsubTopicAndSubscriptionOptions.kt    |  43 +++
 .../kotlin/common/ExamplePubsubTopicOptions.kt     |  43 +++
 .../beam/examples/kotlin/common/ExampleUtils.kt    | 403 +++++++++++++++++++++
 .../kotlin/common/WriteOneFilePerWindow.kt         |  99 +++++
 .../examples/DebuggingWordCountTestKotlin.java     |  57 +++
 .../beam/examples/MinimalWordCountTestKotlin.java  |  91 +++++
 .../beam/examples/WindowedWordCountITKotlin.kt     | 224 ++++++++++++
 .../apache/beam/examples/WordCountITKotlin.java    |  72 ++++
 examples/kotlin/src/test/resources/LICENSE         | 202 +++++++++++
 settings.gradle                                    |   2 +
 23 files changed, 2194 insertions(+), 18 deletions(-)

diff --git a/.test-infra/jenkins/job_PreCommit_Java.groovy 
b/.test-infra/jenkins/job_PreCommit_Java.groovy
index 00a67b4..6d63979 100644
--- a/.test-infra/jenkins/job_PreCommit_Java.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java.groovy
@@ -28,6 +28,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
       '^sdks/java/.*$',
       '^runners/.*$',
       '^examples/java/.*$',
+      '^examples/kotlin/.*$',
       '^release/.*$',
     ]
 )
diff --git a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy 
b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
index e02dd12..7855085 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
@@ -28,6 +28,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
       '^sdks/java/.*$',
       '^runners/google-cloud-dataflow-java/.*$',
       '^examples/java/.*$',
+      '^examples/kotlin/.*$',
       '^release/.*$',
     ]
 )
diff --git a/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy 
b/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
index 5dfc426..37cdd07 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Java_PortabilityApi.groovy
@@ -28,6 +28,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
       '^sdks/java/.*$',
       '^runners/google-cloud-dataflow-java/worker.*$',
       '^examples/java/.*$',
+      '^examples/kotlin/.*$',
       '^release/.*$',
     ]
 )
diff --git a/.test-infra/jenkins/job_PreCommit_Spotless.groovy 
b/.test-infra/jenkins/job_PreCommit_Spotless.groovy
index 68eb312..7dc033d 100644
--- a/.test-infra/jenkins/job_PreCommit_Spotless.groovy
+++ b/.test-infra/jenkins/job_PreCommit_Spotless.groovy
@@ -27,6 +27,7 @@ PrecommitJobBuilder builder = new PrecommitJobBuilder(
       '^sdks/java/.*$',
       '^runners/.*$',
       '^examples/java/.*$',
+      '^examples/kotlin/.*$',
     ]
 )
 builder.build()
diff --git a/examples/OWNERS b/examples/kotlin/OWNERS
similarity index 86%
rename from examples/OWNERS
rename to examples/kotlin/OWNERS
index 3a2b349..3811246 100644
--- a/examples/OWNERS
+++ b/examples/kotlin/OWNERS
@@ -1,4 +1,5 @@
 # See the OWNERS docs at https://s.apache.org/beam-owners
 
 reviewers:
+  - lukecwik
   - aaltay
diff --git a/examples/kotlin/README.md b/examples/kotlin/README.md
new file mode 100644
index 0000000..a820a36
--- /dev/null
+++ b/examples/kotlin/README.md
@@ -0,0 +1,57 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Example Pipelines
+
+The examples included in this module serve to demonstrate the basic
+functionality of Apache Beam, and act as starting points for
+the development of more complex pipelines.
+
+## Word Count
+
+A good starting point for new users is our set of
+[word 
count](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin)
 examples, which computes word frequencies.  This series of four successively 
more detailed pipelines is described in detail in the accompanying 
[walkthrough](https://beam.apache.org/get-started/wordcount-example/).
+
+1. 
[`MinimalWordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt)
 is the simplest word count pipeline and introduces basic concepts like 
[Pipelines](https://beam.apache.org/documentation/programming-guide/#pipeline),
+[PCollections](https://beam.apache.org/documentation/programming-guide/#pcollection),
+[ParDo](https://beam.apache.org/documentation/programming-guide/#transforms-pardo),
+and [reading and writing 
data](https://beam.apache.org/documentation/programming-guide/#io) from 
external storage.
+
+1. 
[`WordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt)
 introduces best practices like 
[PipelineOptions](https://beam.apache.org/documentation/programming-guide/#pipeline)
 and custom 
[PTransforms](https://beam.apache.org/documentation/programming-guide/#transforms-composite).
+
+1. 
[`DebuggingWordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt)
+demonstrates some best practices for instrumenting your pipeline code.
+
+1. 
[`WindowedWordCount`](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt)
 shows how to run the same pipeline over either unbounded PCollections in 
streaming mode or bounded PCollections in batch mode.
+
+## Running Examples
+
+See [Apache Beam WordCount 
Example](https://beam.apache.org/get-started/wordcount-example/) for 
information on running these examples.
+
+## Beyond Word Count [WIP]
+
+After you've finished running your first few word count pipelines, take a look 
at the 
[cookbook](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/cookbook)
+directory for some common and useful patterns like joining, filtering, and 
combining.
+
+The 
[complete](https://github.com/apache/beam/blob/master/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/complete)
+directory contains a few realistic end-to-end pipelines.
+
+See the other examples as well. This directory includes a Java 8 version of the
+MinimalWordCount example, as well as a series of examples in a simple 'mobile
+gaming' domain. This series introduces some advanced concepts.
diff --git a/examples/kotlin/build.gradle b/examples/kotlin/build.gradle
new file mode 100644
index 0000000..9d417e3
--- /dev/null
+++ b/examples/kotlin/build.gradle
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import groovy.json.JsonOutput
+
+plugins { id 'org.apache.beam.module'
+    id 'org.jetbrains.kotlin.jvm' version '1.3.21'
+}
+
+applyJavaNature(exportJavadoc: false)
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: Examples :: Kotlin"
+ext.summary = """Apache Beam SDK provides a simple, Kotlin-based
+interface for processing virtually any size data. This
+artifact includes all Apache Beam Kotlin SDK examples."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+// TODO: Add apexRunner - https://issues.apache.org/jira/browse/BEAM-3583
+def preCommitRunners = ["directRunner", "flinkRunner", "sparkRunner"]
+for (String runner : preCommitRunners) {
+  configurations.create(runner + "PreCommit")
+}
+configurations.sparkRunnerPreCommit {
+  // Ban certain dependencies to prevent a StackOverflow within Spark
+  // because JUL -> SLF4J -> JUL, and similarly JDK14 -> SLF4J -> JDK14
+  exclude group: "org.slf4j", module: "jul-to-slf4j"
+  exclude group: "org.slf4j", module: "slf4j-jdk14"
+}
+
+dependencies {
+  shadow library.java.vendored_guava_20_0
+  shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
+  shadow project(path: 
":beam-sdks-java-extensions-google-cloud-platform-core", configuration: 
"shadow")
+  shadow project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
+  shadow library.java.google_api_client
+  shadow library.java.google_api_services_bigquery
+  shadow library.java.google_http_client
+  shadow library.java.bigdataoss_util
+  shadow library.java.google_auth_library_oauth2_http
+  shadow library.java.google_auth_library_credentials
+  shadow library.java.avro
+  shadow library.java.google_api_services_pubsub
+  shadow library.java.datastore_v1_proto_client
+  shadow library.java.datastore_v1_protos
+  shadow library.java.joda_time
+  shadow library.java.slf4j_api
+  shadow library.java.slf4j_jdk14
+  runtime project(path: ":beam-runners-direct-java", configuration: "shadow")
+  shadowTest project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
+  shadowTest library.java.hamcrest_core
+  shadowTest library.java.hamcrest_library
+  shadowTest library.java.junit
+  shadowTest library.java.mockito_core
+
+  // Add dependencies for the PreCommit configurations
+  // For each runner a project level dependency on the examples project.
+  for (String runner : preCommitRunners) {
+    delegate.add(runner + "PreCommit", project(path: ":beam-examples-kotlin", 
configuration: "shadow"))
+    delegate.add(runner + "PreCommit", project(path: ":beam-examples-kotlin", 
configuration: "shadowTest"))
+  }
+  // https://issues.apache.org/jira/browse/BEAM-3583
+  // apexRunnerPreCommit project(path: ":beam-runners-apex", configuration: 
"shadow")
+  directRunnerPreCommit project(path: ":beam-runners-direct-java", 
configuration: "shadow")
+  flinkRunnerPreCommit project(path: ":beam-runners-flink_2.11", 
configuration: "shadow")
+  // TODO: Make the netty version used configurable, we add netty-all 
4.1.17.Final so it appears on the classpath
+  // before 4.1.8.Final defined by Apache Beam
+  sparkRunnerPreCommit "io.netty:netty-all:4.1.17.Final"
+  sparkRunnerPreCommit project(path: ":beam-runners-spark", configuration: 
"shadow")
+  sparkRunnerPreCommit project(path: ":beam-sdks-java-io-hadoop-file-system", 
configuration: "shadow")
+  sparkRunnerPreCommit library.java.spark_streaming
+  sparkRunnerPreCommit library.java.spark_core
+  implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
+}
+
+/*
+ * Create a ${runner}PreCommit task for each runner which runs a set
+ * of integration tests for WordCount and WindowedWordCount.
+ */
+def preCommitRunnerClass = [
+  apexRunner: "org.apache.beam.runners.apex.TestApexRunner",
+  directRunner: "org.apache.beam.runners.direct.DirectRunner",
+  flinkRunner: "org.apache.beam.runners.flink.TestFlinkRunner",
+  sparkRunner: "org.apache.beam.runners.spark.TestSparkRunner",
+]
+def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+def gcsTempRoot = project.findProperty('gcsTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests/'
+
+for (String runner : preCommitRunners) {
+  tasks.create(name: runner + "PreCommit", type: Test) {
+    def preCommitBeamTestPipelineOptions = [
+       "--project=${gcpProject}",
+       "--tempRoot=${gcsTempRoot}",
+       "--runner=" + preCommitRunnerClass[runner],
+    ]
+    classpath = configurations."${runner}PreCommit"
+    include "**/WordCountIT.class"
+    if (!"sparkRunner".equals(runner)) {
+      include "**/WindowedWordCountIT.class"
+    }
+    forkEvery 1
+    maxParallelForks 4
+    systemProperty "beamTestPipelineOptions", 
JsonOutput.toJson(preCommitBeamTestPipelineOptions)
+  }
+}
+
+/* Define a common precommit task which depends on all the individual 
precommits. */
+task preCommit() {
+  for (String runner : preCommitRunners) {
+    dependsOn runner + "PreCommit"
+  }
+}
+
+compileKotlin {
+    kotlinOptions {
+        jvmTarget = "1.8"
+    }
+}
+compileTestKotlin {
+    kotlinOptions {
+        jvmTarget = "1.8"
+    }
+}
+repositories {
+  mavenCentral()
+}
+
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
new file mode 100644
index 0000000..bd16582
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.metrics.Metrics
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.testing.PAssert
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.values.KV
+import org.slf4j.LoggerFactory
+import java.util.regex.Pattern
+
+/**
+ * An example that verifies word counts in Shakespeare and includes Beam best 
practices.
+ *
+ *
+ * This class, [DebuggingWordCount], is the third in a series of four 
successively more
+ * detailed 'word count' examples. You may first want to take a look at 
[MinimalWordCount] and
+ * [WordCount]. After you've looked at this example, then see the 
[WindowedWordCount]
+ * pipeline, for introduction of additional concepts.
+ *
+ *
+ * Basic concepts, also in the MinimalWordCount and WordCount examples: 
Reading text files;
+ * counting a PCollection; executing a Pipeline both locally and using a 
selected runner; defining
+ * DoFns.
+ *
+ *
+ * New Concepts:
+ *
+ * <pre>
+ * 1. Logging using SLF4J, even in a distributed environment
+ * 2. Creating a custom metric (runners have varying levels of support)
+ * 3. Testing your Pipeline via PAssert
+</pre> *
+ *
+ *
+ * To execute this pipeline locally, specify general pipeline configuration:
+ *
+ * <pre>`--project=YOUR_PROJECT_ID
+`</pre> *
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ *
+ * The input file defaults to a public data set containing the text of of King 
Lear, by William
+ * Shakespeare. You can override it and choose your own input with 
`--inputFile`.
+ */
+public object DebuggingWordCount {
+    /** A DoFn that filters for a specific key based upon a regular 
expression.  */
+    public class FilterTextFn(pattern: String) : DoFn<KV<String, Long>, 
KV<String, Long>>() {
+
+        private val filter: Pattern = Pattern.compile(pattern)
+
+        /**
+         * Concept #2: A custom metric can track values in your pipeline as it 
runs. Each runner
+         * provides varying levels of support for metrics, and may expose them 
in a dashboard, etc.
+         */
+        private val matchedWords = Metrics.counter(FilterTextFn::class.java, 
"matchedWords")
+
+        private val unmatchedWords = Metrics.counter(FilterTextFn::class.java, 
"unmatchedWords")
+
+        @ProcessElement
+        fun processElement(c: ProcessContext) {
+            if (filter.matcher(c.element().key).matches()) {
+                // Log at the "DEBUG" level each element that we match. When 
executing this pipeline
+                // these log lines will appear only if the log level is set to 
"DEBUG" or lower.
+                LOG.debug("Matched: ${c.element().key}")
+                matchedWords.inc()
+                c.output(c.element())
+            } else {
+                // Log at the "TRACE" level each element that is not matched. 
Different log levels
+                // can be used to control the verbosity of logging providing 
an effective mechanism
+                // to filter less important information.
+                LOG.trace("Did not match: ${c.element().key}")
+                unmatchedWords.inc()
+            }
+        }
+
+        companion object {
+            /**
+             * Concept #1: The logger below uses the fully qualified class 
name of FilterTextFn as the
+             * logger. Depending on your SLF4J configuration, log statements 
will likely be qualified by
+             * this name.
+             *
+             *
+             * Note that this is entirely standard SLF4J usage. Some runners 
may provide a default SLF4J
+             * configuration that is most appropriate for their logging 
integration.
+             */
+            private val LOG = LoggerFactory.getLogger(FilterTextFn::class.java)
+        }
+    }
+
+    /**
+     * Options supported by [DebuggingWordCount].
+     *
+     *
+     * Inherits standard configuration options and all options defined in [ ].
+     */
+    public interface WordCountOptions : WordCount.WordCountOptions {
+
+        @get:Description("Regex filter pattern to use in DebuggingWordCount. " 
+ "Only words matching this pattern will be counted.")
+        @get:Default.String("Flourish|stomach")
+        var filterPattern: String
+    }
+
+    @JvmStatic
+    fun runDebuggingWordCount(options: WordCountOptions) {
+        val p = Pipeline.create(options)
+
+        val filteredWords = p.apply("ReadLines", 
TextIO.read().from(options.inputFile))
+                .apply(WordCount.CountWords())
+                .apply(ParDo.of(FilterTextFn(options.filterPattern)))
+
+        /*
+     * Concept #3: PAssert is a set of convenient PTransforms in the style of
+     * Hamcrest's collection matchers that can be used when writing Pipeline 
level tests
+     * to validate the contents of PCollections. PAssert is best used in unit 
tests
+     * with small data sets but is demonstrated here as a teaching tool.
+     *
+     * <p>Below we verify that the set of filtered words matches our expected 
counts. Note
+     * that PAssert does not provide any output and that successful completion 
of the
+     * Pipeline implies that the expectations were met. Learn more at
+     * https://beam.apache.org/documentation/pipelines/test-your-pipeline/ on 
how to test
+     * your Pipeline and see {@link DebuggingWordCountTest} for an example 
unit test.
+     */
+        val expectedResults = listOf(KV.of("Flourish", 3L), KV.of("stomach", 
1L))
+        PAssert.that(filteredWords).containsInAnyOrder(expectedResults)
+
+        p.run().waitUntilFinish()
+    }
+
+    @JvmStatic
+    fun main(args: Array<String>) {
+        val options = (PipelineOptionsFactory.fromArgs(*args).withValidation() 
as WordCountOptions)
+
+        runDebuggingWordCount(options)
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt
new file mode 100644
index 0000000..2c752c4
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/MinimalWordCount.kt
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.TypeDescriptors
+
+/**
+ * An example that counts words in Shakespeare.
+ *
+ *
+ * This class, [MinimalWordCount], is the first in a series of four 
successively more
+ * detailed 'word count' examples. Here, for simplicity, we don't show any 
error-checking or
+ * argument processing, and focus on construction of the pipeline, which 
chains together the
+ * application of core transforms.
+ *
+ *
+ * Next, see the [WordCount] pipeline, then the [DebuggingWordCount], and 
finally the
+ * [WindowedWordCount] pipeline, for more detailed examples that introduce 
additional
+ * concepts.
+ *
+ *
+ * Concepts:
+ *
+ * <pre>
+ * 1. Reading data from text files
+ * 2. Specifying 'inline' transforms
+ * 3. Counting items in a PCollection
+ * 4. Writing data to text files
+</pre> *
+ *
+ *
+ * No arguments are required to run this pipeline. It will be executed with 
the DirectRunner. You
+ * can see the results in the output files in your current working directory, 
with names like
+ * "wordcounts-00001-of-00005. When running on a distributed service, you 
would use an appropriate
+ * file service.
+ */
+public object MinimalWordCount {
+
+    @JvmStatic
+    fun main(args: Array<String>) {
+
+        // Create a PipelineOptions object. This object lets us set various 
execution
+        // options for our pipeline, such as the runner you wish to use. This 
example
+        // will run with the DirectRunner by default, based on the class path 
configured
+        // in its dependencies.
+        val options = PipelineOptionsFactory.create()
+
+        // In order to run your pipeline, you need to make following runner 
specific changes:
+        //
+        // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
+        // or FlinkRunner.
+        // CHANGE 2/3: Specify runner-required options.
+        // For BlockingDataflowRunner, set project and temp location as 
follows:
+        //   val dataflowOptions : DataflowPipelineOptions = 
options.as(DataflowPipelineOptions::class.java)
+        //   dataflowOptions.runner = BlockingDataflowRunner::class.java
+        //   dataflowOptions.project = "SET_YOUR_PROJECT_ID_HERE"
+        //   dataflowOptions.tempLocation = 
"gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"
+        // For FlinkRunner, set the runner as follows. See {@code 
FlinkPipelineOptions}
+        // for more details.
+        //   options.as(FlinkPipelineOptions::class.java)
+        //      .setRunner(FlinkRunner::class.java)
+
+        // Create the Pipeline object with the options we defined above
+        val p = Pipeline.create(options)
+
+        // Concept #1: Apply a root transform to the pipeline; in this case, 
TextIO.Read to read a set
+        // of input text files. TextIO.Read returns a PCollection where each 
element is one line from
+        // the input text (a set of Shakespeare's texts).
+
+        // This example reads a public data set consisting of the complete 
works of Shakespeare.
+        
p.apply<PCollection<String>>(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
+
+                // Concept #2: Apply a FlatMapElements transform the 
PCollection of text lines.
+                // This transform splits the lines in PCollection<String>, 
where each element is an
+                // individual word in Shakespeare's collected texts.
+                .apply(
+                        FlatMapElements.into(TypeDescriptors.strings())
+                                .via(ProcessFunction<String, List<String>> { 
input -> input.split("[^\\p{L}]+").toList() })
+                )
+                // We use a Filter transform to avoid empty word
+                .apply(Filter.by(SerializableFunction<String, Boolean> { input 
-> !input.isEmpty() }))
+                // Concept #3: Apply the Count transform to our PCollection of 
individual words. The Count
+                // transform returns a new PCollection of key/value pairs, 
where each key represents a
+                // unique word in the text. The associated value is the 
occurrence count for that word.
+                .apply(Count.perElement<String>())
+                // Apply a MapElements transform that formats our PCollection 
of word counts into a
+                // printable string, suitable for writing to an output file.
+                .apply(
+                        MapElements.into(TypeDescriptors.strings())
+                                .via(ProcessFunction<KV<String, Long>, String> 
{ input -> "${input.key} : ${input.value}" })
+                )
+                // Concept #4: Apply a write transform, TextIO.Write, at the 
end of the pipeline.
+                // TextIO.Write writes the contents of a PCollection (in this 
case, our PCollection of
+                // formatted strings) to a series of text files.
+                //
+                // By default, it will write to a set of files with names like 
wordcounts-00001-of-00005
+                .apply(TextIO.write().to("wordcounts"))
+
+        p.run().waitUntilFinish()
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt
new file mode 100644
index 0000000..f57a62b
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.examples.kotlin.common.ExampleBigQueryTableOptions
+import org.apache.beam.examples.kotlin.common.ExampleOptions
+import org.apache.beam.examples.kotlin.common.WriteOneFilePerWindow
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.options.*
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.MapElements
+import org.apache.beam.sdk.transforms.ParDo
+import org.apache.beam.sdk.transforms.windowing.FixedWindows
+import org.apache.beam.sdk.transforms.windowing.Window
+import org.apache.beam.sdk.values.PDone
+import org.joda.time.Duration
+import org.joda.time.Instant
+import java.io.IOException
+import java.util.concurrent.ThreadLocalRandom
+
+/**
+ * An example that counts words in text, and can run over either unbounded or 
bounded input
+ * collections.
+ *
+ *
+ * This class, [WindowedWordCount], is the last in a series of four 
successively more
+ * detailed 'word count' examples. First take a look at [MinimalWordCount], 
[WordCount],
+ * and [DebuggingWordCount].
+ *
+ *
+ * Basic concepts, also in the MinimalWordCount, WordCount, and 
DebuggingWordCount examples:
+ * Reading text files; counting a PCollection; writing to GCS; executing a 
Pipeline both locally and
+ * using a selected runner; defining DoFns; user-defined PTransforms; defining 
PipelineOptions.
+ *
+ *
+ * New Concepts:
+ *
+ * <pre>
+ * 1. Unbounded and bounded pipeline input modes
+ * 2. Adding timestamps to data
+ * 3. Windowing
+ * 4. Re-using PTransforms over windowed PCollections
+ * 5. Accessing the window of an element
+ * 6. Writing data to per-window text files
+</pre> *
+ *
+ *
+ * By default, the examples will run with the `DirectRunner`. To change the 
runner,
+ * specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ * See examples/kotlin/README.md for instructions about how to configure 
different runners.
+ *
+ *
+ * To execute this pipeline locally, specify a local output file (if using the 
`DirectRunner`) or output prefix on a supported distributed file system.
+ *
+ * <pre>`--output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+`</pre> *
+ *
+ *
+ * The input file defaults to a public data set containing the text of of King 
Lear, by William
+ * Shakespeare. You can override it and choose your own input with 
`--inputFile`.
+ *
+ *
+ * By default, the pipeline will do fixed windowing, on 10-minute windows. You 
can change this
+ * interval by setting the `--windowSize` parameter, e.g. `--windowSize=15` for
+ * 15-minute windows.
+ *
+ *
+ * The example will try to cancel the pipeline on the signal to terminate the 
process (CTRL-C).
+ */
+public object WindowedWordCount {
+    const val WINDOW_SIZE = 10 // Default window duration in minutes
+
+    /**
+     * Concept #2: A DoFn that sets the data element timestamp. This is a 
silly method, just for this
+     * example, for the bounded data case.
+     *
+     *
+     * Imagine that many ghosts of Shakespeare are all typing madly at the 
same time to recreate
+     * his masterworks. Each line of the corpus will get a random associated 
timestamp somewhere in a
+     * 2-hour period.
+     */
+    public class AddTimestampFn(private val minTimestamp: Instant, private val 
maxTimestamp: Instant) : DoFn<String, String>() {
+
+        @ProcessElement
+        fun processElement(@Element element: String, receiver: 
DoFn.OutputReceiver<String>) {
+            val randomTimestamp = Instant(
+                    ThreadLocalRandom.current()
+                            .nextLong(minTimestamp.millis, 
maxTimestamp.millis))
+
+            /*
+       * Concept #2: Set the data element with that timestamp.
+       */
+            receiver.outputWithTimestamp(element, Instant(randomTimestamp))
+        }
+    }
+
+    /** A [DefaultValueFactory] that returns the current system time.  */
+    public class DefaultToCurrentSystemTime : DefaultValueFactory<Long> {
+        override fun create(options: PipelineOptions): Long? {
+            return System.currentTimeMillis()
+        }
+    }
+
+    /** A [DefaultValueFactory] that returns the minimum timestamp plus one 
hour.  */
+    public class DefaultToMinTimestampPlusOneHour : DefaultValueFactory<Long> {
+        override fun create(options: PipelineOptions): Long? {
+            return (options as Options).minTimestampMillis!! + 
Duration.standardHours(1).millis
+        }
+    }
+
+    /**
+     * Options for [WindowedWordCount].
+     *
+     *
+     * Inherits standard example configuration options, which allow 
specification of the runner, as
+     * well as the [WordCount.WordCountOptions] support for specification of 
the input and
+     * output files.
+     */
+    public interface Options : WordCount.WordCountOptions, ExampleOptions, 
ExampleBigQueryTableOptions {
+        @get:Description("Fixed window duration, in minutes")
+        @get:Default.Integer(WINDOW_SIZE)
+        var windowSize: Int?
+
+        @get:Description("Minimum randomly assigned timestamp, in 
milliseconds-since-epoch")
+        @get:Default.InstanceFactory(DefaultToCurrentSystemTime::class)
+        var minTimestampMillis: Long?
+
+        @get:Description("Maximum randomly assigned timestamp, in 
milliseconds-since-epoch")
+        @get:Default.InstanceFactory(DefaultToMinTimestampPlusOneHour::class)
+        var maxTimestampMillis: Long?
+
+        @get:Description("Fixed number of shards to produce per window")
+        var numShards: Int?
+    }
+
+    @Throws(IOException::class)
+    @JvmStatic
+    fun runWindowedWordCount(options: Options) {
+        val output = options.output
+        val minTimestamp = Instant(options.minTimestampMillis)
+        val maxTimestamp = Instant(options.maxTimestampMillis)
+
+        val pipeline = Pipeline.create(options)
+
+        /*
+     * Concept #1: the Beam SDK lets us run the same pipeline with either a 
bounded or
+     * unbounded input source.
+     */
+        val input = pipeline
+                /* Read from the GCS file. */
+                .apply(TextIO.read().from(options.inputFile))
+                // Concept #2: Add an element timestamp, using an artificial 
time just to show
+                // windowing.
+                // See AddTimestampFn for more detail on this.
+                .apply(ParDo.of(AddTimestampFn(minTimestamp, maxTimestamp)))
+
+        /*
+     * Concept #3: Window into fixed windows. The fixed window size for this 
example defaults to 1
+     * minute (you can change this with a command-line option). See the 
documentation for more
+     * information on how fixed windows work, and for information on the other 
types of windowing
+     * available (e.g., sliding windows).
+     */
+        val windowedWords = input.apply(
+                
Window.into<String>(FixedWindows.of(Duration.standardMinutes(options.windowSize!!.toLong()))))
+
+        /*
+     * Concept #4: Re-use our existing CountWords transform that does not have 
knowledge of
+     * windows over a PCollection containing windowed values.
+     */
+        val wordCounts = windowedWords.apply(WordCount.CountWords())
+
+        /*
+     * Concept #5: Format the results and write to a sharded file partitioned 
by window, using a
+     * simple ParDo operation. Because there may be failures followed by 
retries, the
+     * writes must be idempotent, but the details of writing to files is 
elided here.
+     */
+        wordCounts
+                .apply(MapElements.via(WordCount.FormatAsTextFn()))
+                .apply<PDone>(WriteOneFilePerWindow(output, options.numShards))
+
+        val result = pipeline.run()
+        try {
+            result.waitUntilFinish()
+        } catch (exc: Exception) {
+            result.cancel()
+        }
+
+    }
+
+    @Throws(IOException::class)
+    @JvmStatic
+    fun main(args: Array<String>) {
+        val options = (PipelineOptionsFactory.fromArgs(*args).withValidation() 
as Options)
+        runWindowedWordCount(options)
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt
new file mode 100644
index 0000000..9cf6d3f
--- /dev/null
+++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin
+
+import org.apache.beam.examples.kotlin.common.ExampleUtils
+import org.apache.beam.sdk.Pipeline
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.metrics.Metrics
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.Validation.Required
+import org.apache.beam.sdk.transforms.*
+import org.apache.beam.sdk.values.KV
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PDone
+
+/**
+ * An example that counts words in Shakespeare and includes Beam best 
practices.
+ *
+ *
+ * This class, [WordCount], is the second in a series of four successively 
more detailed
+ * 'word count' examples. You may first want to take a look at 
[MinimalWordCount]. After
+ * you've looked at this example, then see the [DebuggingWordCount] pipeline, 
for introduction
+ * of additional concepts.
+ *
+ *
+ * For a detailed walkthrough of this example, see [
+ * https://beam.apache.org/get-started/wordcount-example/ 
](https://beam.apache.org/get-started/wordcount-example/)
+ *
+ *
+ * Basic concepts, also in the MinimalWordCount example: Reading text files; 
counting a
+ * PCollection; writing to text files
+ *
+ *
+ * New Concepts:
+ *
+ * <pre>
+ * 1. Executing a Pipeline both locally and using the selected runner
+ * 2. Using ParDo with static DoFns defined out-of-line
+ * 3. Building a composite transform
+ * 4. Defining your own pipeline options
+</pre> *
+ *
+ *
+ * Concept #1: you can execute this pipeline either locally or using by 
selecting another runner.
+ * These are now command-line options and not hard-coded as they were in the 
MinimalWordCount
+ * example.
+ *
+ *
+ * To change the runner, specify:
+ *
+ * <pre>`--runner=YOUR_SELECTED_RUNNER
+`</pre> *
+ *
+ *
+ * To execute this pipeline, specify a local output file (if using the 
`DirectRunner`) or
+ * output prefix on a supported distributed file system.
+ *
+ * <pre>`--output=[YOUR_LOCAL_FILE | YOUR_OUTPUT_PREFIX]
+`</pre> *
+ *
+ *
+ * The input file defaults to a public data set containing the text of of King 
Lear, by William
+ * Shakespeare. You can override it and choose your own input with 
`--inputFile`.
+ */
+public object WordCount {
+
+    /**
+     * Concept #2: You can make your pipeline assembly code less verbose by 
defining your DoFns
+     * statically out-of-line. This DoFn tokenizes lines of text into 
individual words; we pass it to
+     * a ParDo in the pipeline.
+     */
+    public class ExtractWordsFn : DoFn<String, String>() {
+        private val emptyLines = Metrics.counter(ExtractWordsFn::class.java, 
"emptyLines")
+        private val lineLenDist = 
Metrics.distribution(ExtractWordsFn::class.java, "lineLenDistro")
+
+        @ProcessElement
+        fun processElement(@Element element: String, receiver: 
DoFn.OutputReceiver<String>) {
+            lineLenDist.update(element.length.toLong())
+            if (element.trim(' ').isEmpty()) {
+                emptyLines.inc()
+            }
+
+            // Split the line into words.
+            val words = 
element.split(ExampleUtils.TOKENIZER_PATTERN.toRegex()).toTypedArray()
+
+            // Output each word encountered into the output PCollection.
+            for (word in words) {
+                if (!word.isEmpty()) {
+                    receiver.output(word)
+                }
+            }
+        }
+    }
+
+    /** A SimpleFunction that converts a Word and Count into a printable 
string.  */
+    public class FormatAsTextFn : SimpleFunction<KV<String, Long>, String>() {
+        override fun apply(input: KV<String, Long>): String {
+            return "${input.key} : ${input.value}"
+        }
+    }
+
+    /**
+     * A PTransform that converts a PCollection containing lines of text into 
a PCollection of
+     * formatted word counts.
+     *
+     *
+     * Concept #3: This is a custom composite transform that bundles two 
transforms (ParDo and
+     * Count) as a reusable PTransform subclass. Using composite transforms 
allows for easy reuse,
+     * modular testing, and an improved monitoring experience.
+     */
+    public class CountWords : PTransform<PCollection<String>, 
PCollection<KV<String, Long>>>() {
+        override fun expand(lines: PCollection<String>): 
PCollection<KV<String, Long>> {
+
+            // Convert lines of text into individual words.
+            val words = lines.apply(ParDo.of(ExtractWordsFn()))
+
+            // Count the number of times each word occurs.
+            val wordCounts = words.apply(Count.perElement())
+
+            return wordCounts
+        }
+    }
+
+    /**
+     * Options supported by [WordCount].
+     *
+     *
+     * Concept #4: Defining your own configuration options. Here, you can add 
your own arguments to
+     * be processed by the command-line parser, and specify default values for 
them. You can then
+     * access the options values in your pipeline code.
+     *
+     *
+     * Inherits standard configuration options.
+     */
+    public interface WordCountOptions : PipelineOptions {
+
+        /**
+         * By default, this example reads from a public dataset containing the 
text of King Lear. Set
+         * this option to choose a different input file or glob.
+         */
+        @get:Description("Path of the file to read from")
+        
@get:Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
+        var inputFile: String
+
+        /** Set this required option to specify where to write the output.  */
+        @get:Description("Path of the file to write to")
+        @get:Required
+        var output: String
+    }
+
+    @JvmStatic
+    fun runWordCount(options: WordCountOptions) {
+        val p = Pipeline.create(options)
+
+        // Concepts #2 and #3: Our pipeline applies the composite CountWords 
transform, and passes the
+        // static FormatAsTextFn() to the ParDo transform.
+        p.apply("ReadLines", TextIO.read().from(options.inputFile))
+                .apply(CountWords())
+                .apply(MapElements.via(FormatAsTextFn()))
+                .apply<PDone>("WriteCounts", TextIO.write().to(options.output))
+
+        p.run().waitUntilFinish()
+    }
+
+    @JvmStatic
+    fun main(args: Array<String>) {
+        val options = (PipelineOptionsFactory.fromArgs(*args).withValidation() 
as WordCountOptions)
+        runWordCount(options)
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt
new file mode 100644
index 0000000..176ff7a
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import com.google.api.services.bigquery.model.TableSchema
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.DefaultValueFactory
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+
+/**
+ * Options that can be used to configure BigQuery tables in Beam examples. The 
project defaults to
+ * the project being used to run the example.
+ */
+interface ExampleBigQueryTableOptions : GcpOptions {
+    @get:Description("BigQuery dataset name")
+    @get:Default.String("beam_examples")
+    var bigQueryDataset: String
+
+    @get:Description("BigQuery table name")
+    @get:Default.InstanceFactory(BigQueryTableFactory::class)
+    var bigQueryTable: String
+
+    @get:Description("BigQuery table schema")
+    var bigQuerySchema: TableSchema
+
+    /** Returns the job name as the default BigQuery table name.  */
+    class BigQueryTableFactory : DefaultValueFactory<String> {
+        override fun create(options: PipelineOptions): String {
+            return options.jobName.replace('-', '_')
+        }
+    }
+}
diff --git a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleOptions.kt
similarity index 57%
copy from .test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
copy to 
examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleOptions.kt
index e02dd12..f8bb4a1 100644
--- a/.test-infra/jenkins/job_PreCommit_Java_Examples_Dataflow.groovy
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleOptions.kt
@@ -15,24 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.beam.examples.kotlin.common
 
-import PrecommitJobBuilder
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
 
-PrecommitJobBuilder builder = new PrecommitJobBuilder(
-    scope: this,
-    nameBase: 'Java_Examples_Dataflow',
-    gradleTask: ':javaExamplesDataflowPreCommit',
-    gradleSwitches: ['-PdisableSpotlessCheck=true'], // spotless checked in 
separate pre-commit
-    triggerPathPatterns: [
-      '^model/.*$',
-      '^sdks/java/.*$',
-      '^runners/google-cloud-dataflow-java/.*$',
-      '^examples/java/.*$',
-      '^release/.*$',
-    ]
-)
-builder.build {
-  publishers {
-    archiveJunit('**/build/test-results/**/*.xml')
-  }
+/** Options that can be used to configure the Beam examples.  */
+interface ExampleOptions : PipelineOptions {
+    @get:Description("Whether to keep jobs running after local process exit")
+    @get:Default.Boolean(false)
+    var keepJobsRunning: Boolean
+
+    @get:Description("Number of workers to use when executing the injector 
pipeline")
+    @get:Default.Integer(1)
+    var injectorNumWorkers: Int
 }
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicAndSubscriptionOptions.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicAndSubscriptionOptions.kt
new file mode 100644
index 0000000..1050f47
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicAndSubscriptionOptions.kt
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.DefaultValueFactory
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+
+/** Options that can be used to configure Pub/Sub topic/subscription in Beam 
examples.  */
+interface ExamplePubsubTopicAndSubscriptionOptions : ExamplePubsubTopicOptions 
{
+    @get:Description("Pub/Sub subscription")
+    @get:Default.InstanceFactory(PubsubSubscriptionFactory::class)
+    var pubsubSubscription: String
+
+    /** Returns a default Pub/Sub subscription based on the project and the 
job names.  */
+    class PubsubSubscriptionFactory : DefaultValueFactory<String> {
+        override fun create(options: PipelineOptions): String {
+            return """
+                projects/
+                ${(options as GcpOptions).project}
+                /subscriptions/
+                ${options.jobName}
+            """.trimIndent()
+        }
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicOptions.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicOptions.kt
new file mode 100644
index 0000000..341aae5
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExamplePubsubTopicOptions.kt
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+import org.apache.beam.sdk.options.Default
+import org.apache.beam.sdk.options.DefaultValueFactory
+import org.apache.beam.sdk.options.Description
+import org.apache.beam.sdk.options.PipelineOptions
+
+/** Options that can be used to configure Pub/Sub topic in Beam examples.  */
+interface ExamplePubsubTopicOptions : GcpOptions {
+    @get:Description("Pub/Sub topic")
+    @get:Default.InstanceFactory(PubsubTopicFactory::class)
+    var pubsubTopic: String
+
+    /** Returns a default Pub/Sub topic based on the project and the job 
names.  */
+    class PubsubTopicFactory : DefaultValueFactory<String> {
+        override fun create(options: PipelineOptions): String {
+            return """
+                projects/"
+                    ${(options as GcpOptions).project}
+                    /topics/
+                    ${options.jobName}
+            """.trimIndent()
+        }
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleUtils.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleUtils.kt
new file mode 100644
index 0000000..8144a11
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleUtils.kt
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest
+import com.google.api.client.http.HttpRequestInitializer
+import com.google.api.services.bigquery.Bigquery
+import com.google.api.services.bigquery.model.*
+import com.google.api.services.pubsub.Pubsub
+import com.google.api.services.pubsub.model.Subscription
+import com.google.api.services.pubsub.model.Topic
+import com.google.auth.Credentials
+import com.google.auth.http.HttpCredentialsAdapter
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer
+import org.apache.beam.sdk.PipelineResult
+import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer
+import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer
+import org.apache.beam.sdk.extensions.gcp.util.Transport
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions
+import org.apache.beam.sdk.options.PipelineOptions
+import org.apache.beam.sdk.util.BackOffUtils
+import org.apache.beam.sdk.util.FluentBackoff
+import org.apache.beam.sdk.util.Sleeper
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles
+import org.joda.time.Duration
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+
+/**
+ * The utility class that sets up and tears down external resources, and 
cancels the streaming
+ * pipelines once the program terminates.
+ *
+ *
+ * It is used to run Beam examples.
+ */
+class ExampleUtils
+/** Do resources and runner options setup.  */
+(private val options: PipelineOptions) {
+    private val bigQueryClient: Bigquery by lazy {
+        newBigQueryClient(options as BigQueryOptions).build()
+    }
+    private val pubsubClient: Pubsub by lazy {
+        newPubsubClient(options as PubsubOptions).build()
+    }
+    private val pipelinesToCancel = Sets.newHashSet<PipelineResult>()
+    private val pendingMessages = Lists.newArrayList<String>()
+
+    /**
+     * Sets up external resources that are required by the example, such as 
Pub/Sub topics and
+     * BigQuery tables.
+     *
+     * @throws IOException if there is a problem setting up the resources
+     */
+    @Throws(IOException::class)
+    fun setup() {
+        val sleeper = Sleeper.DEFAULT
+        val backOff = 
FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff()
+        var lastException: Throwable? = null
+        try {
+            do {
+                try {
+                    setupPubsub()
+                    setupBigQueryTable()
+                    return
+                } catch (e: GoogleJsonResponseException) {
+                    lastException = e
+                }
+
+            } while (BackOffUtils.next(sleeper, backOff))
+        } catch (e: InterruptedException) {
+            Thread.currentThread().interrupt()
+            // Ignore InterruptedException
+        }
+
+        throw RuntimeException(lastException)
+    }
+
+    /**
+     * Sets up the Google Cloud Pub/Sub topic.
+     *
+     *
+     * If the topic doesn't exist, a new topic with the given name will be 
created.
+     *
+     * @throws IOException if there is a problem setting up the Pub/Sub topic
+     */
+    @Throws(IOException::class)
+    fun setupPubsub() {
+        val pubsubOptions = options as ExamplePubsubTopicAndSubscriptionOptions
+        if (pubsubOptions.pubsubTopic.isNotEmpty()) {
+            pendingMessages.add("**********************Set Up 
Pubsub************************")
+            setupPubsubTopic(pubsubOptions.pubsubTopic)
+            pendingMessages.add(
+                    "The Pub/Sub topic has been set up for this example: 
${pubsubOptions.pubsubTopic}")
+
+            if (pubsubOptions.pubsubSubscription.isNotEmpty()) {
+                setupPubsubSubscription(
+                        pubsubOptions.pubsubTopic, 
pubsubOptions.pubsubSubscription)
+                pendingMessages.add(
+                        "The Pub/Sub subscription has been set up for this 
example: ${pubsubOptions.pubsubSubscription}")
+            }
+        }
+    }
+
+    /**
+     * Sets up the BigQuery table with the given schema.
+     *
+     *
+     * If the table already exists, the schema has to match the given one. 
Otherwise, the example
+     * will throw a RuntimeException. If the table doesn't exist, a new table 
with the given schema
+     * will be created.
+     *
+     * @throws IOException if there is a problem setting up the BigQuery table
+     */
+    @Throws(IOException::class)
+    fun setupBigQueryTable() {
+        val bigQueryTableOptions = options as ExampleBigQueryTableOptions
+        if (bigQueryTableOptions.bigQueryDataset != null
+                && bigQueryTableOptions.bigQueryTable != null
+                && bigQueryTableOptions.bigQuerySchema != null) {
+            pendingMessages.add("******************Set Up Big Query 
Table*******************")
+            setupBigQueryTable(
+                    bigQueryTableOptions.project,
+                    bigQueryTableOptions.bigQueryDataset,
+                    bigQueryTableOptions.bigQueryTable,
+                    bigQueryTableOptions.bigQuerySchema)
+            pendingMessages.add(
+                    """
+                        The BigQuery table has been set up for this example:
+                        ${bigQueryTableOptions.project}:
+                        ${bigQueryTableOptions.bigQueryDataset}.
+                        ${bigQueryTableOptions.bigQueryTable}
+                    """.trimIndent()
+            )
+        }
+    }
+
+    /** Tears down external resources that can be deleted upon the example's 
completion.  */
+    private fun tearDown() {
+        pendingMessages.add("*************************Tear 
Down*************************")
+        val pubsubOptions = options as ExamplePubsubTopicAndSubscriptionOptions
+        if (pubsubOptions.pubsubTopic.isNotEmpty()) {
+            try {
+                deletePubsubTopic(pubsubOptions.pubsubTopic)
+                pendingMessages.add(
+                        "The Pub/Sub topic has been deleted: 
${pubsubOptions.pubsubTopic}")
+            } catch (e: IOException) {
+                pendingMessages.add(
+                        "Failed to delete the Pub/Sub topic : 
${pubsubOptions.pubsubTopic}")
+            }
+
+            if (pubsubOptions.pubsubSubscription.isNotEmpty()) {
+                try {
+                    deletePubsubSubscription(pubsubOptions.pubsubSubscription)
+                    pendingMessages.add(
+                            "The Pub/Sub subscription has been deleted: 
${pubsubOptions.pubsubSubscription}")
+                } catch (e: IOException) {
+                    pendingMessages.add(
+                            "Failed to delete the Pub/Sub subscription : 
${pubsubOptions.pubsubSubscription}")
+                }
+
+            }
+        }
+
+        val bigQueryTableOptions = options as ExampleBigQueryTableOptions
+        if (bigQueryTableOptions.bigQueryDataset != null
+                && bigQueryTableOptions.bigQueryTable != null
+                && bigQueryTableOptions.bigQuerySchema != null) {
+            pendingMessages.add(
+                    """
+                        The BigQuery table might contain the example's output, 
and it is not deleted automatically:
+                        ${bigQueryTableOptions.project}:
+                        ${bigQueryTableOptions.bigQueryDataset}.
+                        ${bigQueryTableOptions.bigQueryTable}
+                    """.trimIndent())
+            pendingMessages.add(
+                    "Please go to the Developers Console to delete it 
manually. Otherwise, you may be charged for its usage.")
+        }
+    }
+
+    @Throws(IOException::class)
+    private fun setupBigQueryTable(
+            projectId: String, datasetId: String, tableId: String, schema: 
TableSchema) {
+
+        val datasetService = bigQueryClient.datasets()
+        if (executeNullIfNotFound(datasetService.get(projectId, datasetId)) == 
null) {
+            val newDataset = Dataset()
+                    .setDatasetReference(
+                            
DatasetReference().setProjectId(projectId).setDatasetId(datasetId))
+            datasetService.insert(projectId, newDataset).execute()
+        }
+
+        val tableService = bigQueryClient.tables()
+        val table = executeNullIfNotFound(tableService.get(projectId, 
datasetId, tableId))
+        if (table == null) {
+            val newTable = Table()
+                    .setSchema(schema)
+                    .setTableReference(
+                            TableReference()
+                                    .setProjectId(projectId)
+                                    .setDatasetId(datasetId)
+                                    .setTableId(tableId))
+            tableService.insert(projectId, datasetId, newTable).execute()
+        } else if (table.schema != schema) {
+            throw RuntimeException(
+                    """
+                        Table exists and schemas do not match, expecting:
+                        ${schema.toPrettyString()}, actual:
+                        ${table.schema.toPrettyString()}
+                    """.trimIndent()
+            )
+        }
+    }
+
+    @Throws(IOException::class)
+    private fun setupPubsubTopic(topic: String) {
+        pubsubClient.projects().topics().create(topic, 
Topic().setName(topic)).execute()
+    }
+
+    @Throws(IOException::class)
+    private fun setupPubsubSubscription(topic: String, subscription: String) {
+        val subInfo = Subscription().setAckDeadlineSeconds(60).setTopic(topic)
+        pubsubClient.projects().subscriptions().create(subscription, 
subInfo).execute()
+    }
+
+    /**
+     * Deletes the Google Cloud Pub/Sub topic.
+     *
+     * @throws IOException if there is a problem deleting the Pub/Sub topic
+     */
+    @Throws(IOException::class)
+    private fun deletePubsubTopic(topic: String) {
+        if (executeNullIfNotFound(pubsubClient.projects().topics().get(topic)) 
!= null) {
+            pubsubClient.projects().topics().delete(topic).execute()
+        }
+    }
+
+    /**
+     * Deletes the Google Cloud Pub/Sub subscription.
+     *
+     * @throws IOException if there is a problem deleting the Pub/Sub 
subscription
+     */
+    @Throws(IOException::class)
+    private fun deletePubsubSubscription(subscription: String) {
+        if 
(executeNullIfNotFound(pubsubClient.projects().subscriptions().get(subscription))
 != null) {
+            
pubsubClient.projects().subscriptions().delete(subscription).execute()
+        }
+    }
+
+    /** Waits for the pipeline to finish and cancels it before the program 
exists.  */
+    fun waitToFinish(result: PipelineResult) {
+        pipelinesToCancel.add(result)
+        if (!(options as ExampleOptions).keepJobsRunning) {
+            addShutdownHook(pipelinesToCancel)
+        }
+        try {
+            result.waitUntilFinish()
+        } catch (e: UnsupportedOperationException) {
+            // Do nothing if the given PipelineResult doesn't support 
waitUntilFinish(),
+            // such as EvaluationResults returned by DirectRunner.
+            tearDown()
+            printPendingMessages()
+        } catch (e: Exception) {
+            throw RuntimeException("Failed to wait the pipeline until finish: 
$result")
+        }
+
+    }
+
+    private fun addShutdownHook(pipelineResults: Collection<PipelineResult>) {
+        Runtime.getRuntime()
+                .addShutdownHook(
+                        Thread {
+                            tearDown()
+                            printPendingMessages()
+                            for (pipelineResult in pipelineResults) {
+                                try {
+                                    pipelineResult.cancel()
+                                } catch (e: IOException) {
+                                    println("Failed to cancel the job.")
+                                    println(e.message)
+                                }
+
+                            }
+
+                            for (pipelineResult in pipelineResults) {
+                                var cancellationVerified = false
+                                for (retryAttempts in 6 downTo 1) {
+                                    if (pipelineResult.state.isTerminal) {
+                                        cancellationVerified = true
+                                        break
+                                    } else {
+                                        println(
+                                                "The example pipeline is still 
running. Verifying the cancellation.")
+                                    }
+                                    Uninterruptibles.sleepUninterruptibly(10, 
TimeUnit.SECONDS)
+                                }
+                                if (!cancellationVerified) {
+                                    println(
+                                            "Failed to verify the cancellation 
for job: $pipelineResult")
+                                }
+                            }
+                        })
+    }
+
+    private fun printPendingMessages() {
+        println()
+        println("***********************************************************")
+        println("***********************************************************")
+        for (message in pendingMessages) {
+            println(message)
+        }
+        println("***********************************************************")
+        println("***********************************************************")
+    }
+
+    companion object {
+
+        private const val SC_NOT_FOUND = 404
+
+        /**
+         * \p{L} denotes the category of Unicode letters, so this pattern will 
match on everything that is
+         * not a letter.
+         *
+         *
+         * It is used for tokenizing strings in the wordcount examples.
+         */
+        const val TOKENIZER_PATTERN = "[^\\p{L}]+"
+
+        /** Returns a BigQuery client builder using the specified 
[BigQueryOptions].  */
+        private fun newBigQueryClient(options: BigQueryOptions): 
Bigquery.Builder {
+            return Bigquery.Builder(
+                    Transport.getTransport(),
+                    Transport.getJsonFactory(),
+                    chainHttpRequestInitializer(
+                            options.gcpCredential,
+                            // Do not log 404. It clutters the output and is 
possibly even required by the
+                            // caller.
+                            
RetryHttpRequestInitializer(ImmutableList.of(404))))
+                    .setApplicationName(options.appName)
+                    .setGoogleClientRequestInitializer(options.googleApiTrace)
+        }
+
+        /** Returns a Pubsub client builder using the specified 
[PubsubOptions].  */
+        private fun newPubsubClient(options: PubsubOptions): Pubsub.Builder {
+            return Pubsub.Builder(
+                    Transport.getTransport(),
+                    Transport.getJsonFactory(),
+                    chainHttpRequestInitializer(
+                            options.gcpCredential,
+                            // Do not log 404. It clutters the output and is 
possibly even required by the
+                            // caller.
+                            
RetryHttpRequestInitializer(ImmutableList.of(404))))
+                    .setRootUrl(options.pubsubRootUrl)
+                    .setApplicationName(options.appName)
+                    .setGoogleClientRequestInitializer(options.googleApiTrace)
+        }
+
+        private fun chainHttpRequestInitializer(
+                credential: Credentials?, httpRequestInitializer: 
HttpRequestInitializer): HttpRequestInitializer {
+
+            return credential?.let {
+                ChainingHttpRequestInitializer(
+                        HttpCredentialsAdapter(credential), 
httpRequestInitializer)
+            } ?: kotlin.run {
+                ChainingHttpRequestInitializer(
+                        NullCredentialInitializer(), httpRequestInitializer)
+            }
+        }
+
+        @Throws(IOException::class)
+        private fun <T> executeNullIfNotFound(request: 
AbstractGoogleClientRequest<T>): T? {
+            return try {
+                request.execute()
+            } catch (e: GoogleJsonResponseException) {
+                if (e.statusCode == SC_NOT_FOUND) {
+                    null
+                } else {
+                    throw e
+                }
+            }
+
+        }
+    }
+}
diff --git 
a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt
 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt
new file mode 100644
index 0000000..1e12a77
--- /dev/null
+++ 
b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/WriteOneFilePerWindow.kt
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.kotlin.common
+
+import org.apache.beam.sdk.io.FileBasedSink
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints
+import org.apache.beam.sdk.io.TextIO
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
+import org.apache.beam.sdk.io.fs.ResourceId
+import org.apache.beam.sdk.transforms.DoFn
+import org.apache.beam.sdk.transforms.PTransform
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow
+import org.apache.beam.sdk.transforms.windowing.PaneInfo
+import org.apache.beam.sdk.values.PCollection
+import org.apache.beam.sdk.values.PDone
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull
+import org.joda.time.format.ISODateTimeFormat
+
+/**
+ * A [DoFn] that writes elements to files with names deterministically derived 
from the lower
+ * and upper bounds of their key (an [IntervalWindow]).
+ *
+ *
+ * This is test utility code, not for end-users, so examples can be focused on 
their primary
+ * lessons.
+ */
+class WriteOneFilePerWindow(private val filenamePrefix: String, private val 
numShards: Int?) : PTransform<PCollection<String>, PDone>() {
+
+    override fun expand(input: PCollection<String>): PDone {
+        val resource = 
FileBasedSink.convertToFileResourceIfPossible(filenamePrefix)
+        var write = TextIO.write()
+                .to(PerWindowFiles(resource))
+                .withTempDirectory(resource.currentDirectory)
+                .withWindowedWrites()
+        if (numShards != null) {
+            write = write.withNumShards(numShards)
+        }
+        return input.apply(write)
+    }
+
+    /**
+     * A [FilenamePolicy] produces a base file name for a write based on 
metadata about the data
+     * being written. This always includes the shard number and the total 
number of shards. For
+     * windowed writes, it also includes the window and pane index (a sequence 
number assigned to each
+     * trigger firing).
+     */
+    class PerWindowFiles(private val baseFilename: ResourceId) : 
FilenamePolicy() {
+
+        fun filenamePrefixForWindow(window: IntervalWindow): String {
+            val prefix = if (baseFilename.isDirectory) "" else 
firstNonNull(baseFilename.filename, "")
+            return String.format(
+                    "%s-%s-%s", prefix, FORMATTER.print(window.start()), 
FORMATTER.print(window.end()))
+        }
+
+        override fun windowedFilename(
+                shardNumber: Int,
+                numShards: Int,
+                window: BoundedWindow,
+                paneInfo: PaneInfo,
+                outputFileHints: OutputFileHints): ResourceId {
+            val intervalWindow = window as IntervalWindow
+            val filename = String.format(
+                    "%s-%s-of-%s%s",
+                    filenamePrefixForWindow(intervalWindow),
+                    shardNumber,
+                    numShards,
+                    outputFileHints.suggestedFilenameSuffix)
+            return baseFilename
+                    .currentDirectory
+                    .resolve(filename, StandardResolveOptions.RESOLVE_FILE)
+        }
+
+        override fun unwindowedFilename(
+                shardNumber: Int, numShards: Int, outputFileHints: 
OutputFileHints): ResourceId? {
+            throw UnsupportedOperationException("Unsupported.")
+        }
+    }
+
+    companion object {
+        private val FORMATTER = ISODateTimeFormat.hourMinute()
+    }
+}
diff --git 
a/examples/kotlin/src/test/java/org/apache/beam/examples/DebuggingWordCountTestKotlin.java
 
b/examples/kotlin/src/test/java/org/apache/beam/examples/DebuggingWordCountTestKotlin.java
new file mode 100644
index 0000000..4da3b35
--- /dev/null
+++ 
b/examples/kotlin/src/test/java/org/apache/beam/examples/DebuggingWordCountTestKotlin.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.examples.kotlin.DebuggingWordCount;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Files;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link org.apache.beam.examples.kotlin.DebuggingWordCount}. */
+@RunWith(JUnit4.class)
+public class DebuggingWordCountTestKotlin {
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  private String getFilePath(String filePath) {
+    if (filePath.contains(":")) {
+      return filePath.replace("\\", "/").split(":", -1)[1];
+    }
+    return filePath;
+  }
+
+  @Test
+  public void testDebuggingWordCount() throws Exception {
+    File inputFile = tmpFolder.newFile();
+    File outputFile = tmpFolder.newFile();
+    Files.write(
+        "stomach secret Flourish message Flourish here Flourish",
+        inputFile,
+        StandardCharsets.UTF_8);
+    DebuggingWordCount.WordCountOptions options =
+        
TestPipeline.testingPipelineOptions().as(DebuggingWordCount.WordCountOptions.class);
+    options.setInputFile(getFilePath(inputFile.getAbsolutePath()));
+    options.setOutput(getFilePath(outputFile.getAbsolutePath()));
+    DebuggingWordCount.runDebuggingWordCount(options);
+  }
+}
diff --git 
a/examples/kotlin/src/test/java/org/apache/beam/examples/MinimalWordCountTestKotlin.java
 
b/examples/kotlin/src/test/java/org/apache/beam/examples/MinimalWordCountTestKotlin.java
new file mode 100644
index 0000000..8c029ac
--- /dev/null
+++ 
b/examples/kotlin/src/test/java/org/apache/beam/examples/MinimalWordCountTestKotlin.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * To keep {@link org.apache.beam.examples.kotlin.MinimalWordCount} simple, it 
is not factored or
+ * testable. This test file should be maintained with a copy of its code for a 
basic smoke test.
+ */
+@RunWith(JUnit4.class)
+public class MinimalWordCountTestKotlin implements Serializable {
+
+  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  /** A basic smoke test that ensures there is no crash at pipeline 
construction time. */
+  @Test
+  public void testMinimalWordCount() throws Exception {
+    p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
+
+    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
+        .apply(
+            FlatMapElements.into(TypeDescriptors.strings())
+                .via((String word) -> 
Arrays.asList(word.split("[^a-zA-Z']+"))))
+        .apply(Filter.by((String word) -> !word.isEmpty()))
+        .apply(Count.perElement())
+        .apply(
+            MapElements.into(TypeDescriptors.strings())
+                .via(
+                    (KV<String, Long> wordCount) ->
+                        wordCount.getKey() + ": " + wordCount.getValue()))
+        .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
+  }
+
+  private GcsUtil buildMockGcsUtil() throws IOException {
+    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+    // Any request to open gets a new bogus channel
+    Mockito.when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+        .then(
+            invocation ->
+                FileChannel.open(
+                    Files.createTempFile("channel-", ".tmp"),
+                    StandardOpenOption.CREATE,
+                    StandardOpenOption.DELETE_ON_CLOSE));
+
+    // Any request for expansion returns a list containing the original GcsPath
+    // This is required to pass validation that occurs in TextIO during apply()
+    Mockito.when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+        .then(invocation -> ImmutableList.of((GcsPath) 
invocation.getArguments()[0]));
+
+    return mockGcsUtil;
+  }
+}
diff --git 
a/examples/kotlin/src/test/java/org/apache/beam/examples/WindowedWordCountITKotlin.kt
 
b/examples/kotlin/src/test/java/org/apache/beam/examples/WindowedWordCountITKotlin.kt
new file mode 100644
index 0000000..8d25bd4
--- /dev/null
+++ 
b/examples/kotlin/src/test/java/org/apache/beam/examples/WindowedWordCountITKotlin.kt
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples
+
+import org.apache.beam.examples.kotlin.WindowedWordCount
+import org.apache.beam.examples.kotlin.common.ExampleUtils
+import 
org.apache.beam.examples.kotlin.common.WriteOneFilePerWindow.PerWindowFiles
+import org.apache.beam.sdk.PipelineResult
+import org.apache.beam.sdk.io.FileBasedSink
+import org.apache.beam.sdk.io.FileSystems
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
+import org.apache.beam.sdk.options.PipelineOptionsFactory
+import org.apache.beam.sdk.options.StreamingOptions
+import org.apache.beam.sdk.testing.SerializableMatcher
+import org.apache.beam.sdk.testing.StreamingIT
+import org.apache.beam.sdk.testing.TestPipeline
+import org.apache.beam.sdk.testing.TestPipelineOptions
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow
+import org.apache.beam.sdk.util.*
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists
+import org.hamcrest.Description
+import org.hamcrest.Matchers.equalTo
+import org.hamcrest.TypeSafeMatcher
+import org.joda.time.Duration
+import org.joda.time.Instant
+import org.junit.BeforeClass
+import org.junit.Rule
+import org.junit.Test
+import org.junit.experimental.categories.Category
+import org.junit.rules.TestName
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import java.util.*
+import java.util.concurrent.ThreadLocalRandom
+
+/** End-to-end integration test of [WindowedWordCount].  */
+@RunWith(JUnit4::class)
+class WindowedWordCountITKotlin {
+
+    @Rule
+    var testName = TestName()
+
+    /** Options for the [WindowedWordCount] Integration Test.  */
+    interface WindowedWordCountITOptions : WindowedWordCount.Options, 
TestPipelineOptions, StreamingOptions
+
+    @Test
+    @Throws(Exception::class)
+    fun testWindowedWordCountInBatchDynamicSharding() {
+        val options = batchOptions()
+        // This is the default value, but make it explicit.
+        options.numShards = null
+        testWindowedWordCountPipeline(options)
+    }
+
+    @Test
+    @Throws(Exception::class)
+    fun testWindowedWordCountInBatchStaticSharding() {
+        val options = batchOptions()
+        options.numShards = 3
+        testWindowedWordCountPipeline(options)
+    }
+
+    // TODO: add a test with streaming and dynamic sharding after resolving
+    // https://issues.apache.org/jira/browse/BEAM-1438
+
+    @Test
+    @Category(StreamingIT::class)
+    @Throws(Exception::class)
+    fun testWindowedWordCountInStreamingStaticSharding() {
+        val options = streamingOptions()
+        options.numShards = 3
+        testWindowedWordCountPipeline(options)
+    }
+
+    @Throws(Exception::class)
+    private fun defaultOptions(): WindowedWordCountITOptions {
+        val options = 
TestPipeline.testingPipelineOptions().`as`(WindowedWordCountITOptions::class.java)
+        options.inputFile = DEFAULT_INPUT
+        options.testTimeoutSeconds = 1200L
+
+        options.minTimestampMillis = 0L
+        options.minTimestampMillis = Duration.standardHours(1).millis
+        options.windowSize = 10
+
+        options.output = FileSystems.matchNewResource(options.tempRoot, true)
+                .resolve(
+                        String.format(
+                                
"WindowedWordCountITKotlin.%s-%tFT%<tH:%<tM:%<tS.%<tL+%s",
+                                testName.methodName, Date(), 
ThreadLocalRandom.current().nextInt()),
+                        StandardResolveOptions.RESOLVE_DIRECTORY)
+                .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
+                .resolve("results", StandardResolveOptions.RESOLVE_FILE)
+                .toString()
+        return options
+    }
+
+    @Throws(Exception::class)
+    private fun streamingOptions(): WindowedWordCountITOptions {
+        val options = defaultOptions()
+        options.isStreaming = true
+        return options
+    }
+
+    @Throws(Exception::class)
+    private fun batchOptions(): WindowedWordCountITOptions {
+        val options = defaultOptions()
+        // This is the default value, but make it explicit
+        options.isStreaming = false
+        return options
+    }
+
+    @Throws(Exception::class)
+    private fun testWindowedWordCountPipeline(options: 
WindowedWordCountITOptions) {
+
+        val output = 
FileBasedSink.convertToFileResourceIfPossible(options.output)
+        val filenamePolicy = PerWindowFiles(output)
+
+        val expectedOutputFiles = 
Lists.newArrayListWithCapacity<ShardedFile>(6)
+
+        for (startMinute in ImmutableList.of(0, 10, 20, 30, 40, 50)) {
+            val windowStart = 
Instant(options.minTimestampMillis).plus(Duration.standardMinutes(startMinute.toLong()))
+            val filePrefix = filenamePolicy.filenamePrefixForWindow(
+                    IntervalWindow(windowStart, 
windowStart.plus(Duration.standardMinutes(10))))
+            expectedOutputFiles.add(
+                    NumberedShardedFile(
+                            output
+                                    .currentDirectory
+                                    .resolve(filePrefix, 
StandardResolveOptions.RESOLVE_FILE)
+                                    .toString() + "*"))
+        }
+
+        val inputFile = ExplicitShardedFile(setOf(options.inputFile))
+
+        // For this integration test, input is tiny and we can build the 
expected counts
+        val expectedWordCounts = TreeMap<String, Long>()
+        for (line in inputFile.readFilesWithRetries(Sleeper.DEFAULT, 
BACK_OFF_FACTORY.backoff())) {
+            val words = 
line.split(ExampleUtils.TOKENIZER_PATTERN.toRegex()).toTypedArray()
+
+            for (word in words) {
+                if (word.isNotEmpty()) {
+                    expectedWordCounts[word] = 
MoreObjects.firstNonNull(expectedWordCounts[word], 0L) + 1L
+                }
+            }
+        }
+
+        options.onSuccessMatcher = WordCountsMatcher(expectedWordCounts, 
expectedOutputFiles)
+
+        WindowedWordCount.runWindowedWordCount(options)
+    }
+
+    /**
+     * A matcher that bakes in expected word counts, so they can be read 
directly via some other
+     * mechanism, and compares a sharded output file with the result.
+     */
+    private class WordCountsMatcher(
+            private val expectedWordCounts: SortedMap<String, Long>, private 
val outputFiles: List<ShardedFile>) : TypeSafeMatcher<PipelineResult>(), 
SerializableMatcher<PipelineResult> {
+        private var actualCounts: SortedMap<String, Long>? = null
+
+        public override fun matchesSafely(pipelineResult: PipelineResult): 
Boolean {
+            try {
+                // Load output data
+                val outputLines = ArrayList<String>()
+                for (outputFile in outputFiles) {
+                    outputLines.addAll(
+                            outputFile.readFilesWithRetries(Sleeper.DEFAULT, 
BACK_OFF_FACTORY.backoff()))
+                }
+
+                // Since the windowing is nondeterministic we only check the 
sums
+                actualCounts = TreeMap()
+                for (line in outputLines) {
+                    val splits = line.split(": ".toRegex()).toTypedArray()
+                    val word = splits[0]
+                    val count = java.lang.Long.parseLong(splits[1])
+                    (actualCounts as java.util.Map<String, Long>).merge(word, 
count) { a, b -> a + b }
+                }
+                return actualCounts == expectedWordCounts
+            } catch (e: Exception) {
+                throw RuntimeException(
+                        String.format("Failed to read from sharded output: %s 
due to exception", outputFiles), e)
+            }
+
+        }
+
+        override fun describeTo(description: Description) {
+            equalTo(expectedWordCounts).describeTo(description)
+        }
+
+        public override fun describeMismatchSafely(pResult: PipelineResult, 
description: Description) {
+            equalTo(expectedWordCounts).describeMismatch(actualCounts, 
description)
+        }
+    }
+
+    companion object {
+
+        private const val DEFAULT_INPUT = 
"gs://apache-beam-samples/shakespeare/sonnets.txt"
+        private const val MAX_READ_RETRIES = 4
+        private val DEFAULT_SLEEP_DURATION = Duration.standardSeconds(10L)
+        internal val BACK_OFF_FACTORY = FluentBackoff.DEFAULT
+                .withInitialBackoff(DEFAULT_SLEEP_DURATION)
+                .withMaxRetries(MAX_READ_RETRIES)
+
+        @BeforeClass
+        fun setUp() {
+            PipelineOptionsFactory.register(TestPipelineOptions::class.java)
+        }
+    }
+}
diff --git 
a/examples/kotlin/src/test/java/org/apache/beam/examples/WordCountITKotlin.java 
b/examples/kotlin/src/test/java/org/apache/beam/examples/WordCountITKotlin.java
new file mode 100644
index 0000000..ef016b9
--- /dev/null
+++ 
b/examples/kotlin/src/test/java/org/apache/beam/examples/WordCountITKotlin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.Date;
+import org.apache.beam.examples.kotlin.WordCount;
+import org.apache.beam.examples.kotlin.WordCount.WordCountOptions;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end tests of WordCount. */
+@RunWith(JUnit4.class)
+public class WordCountITKotlin {
+  private static final String DEFAULT_INPUT =
+      "gs://apache-beam-samples/shakespeare/winterstale-personae";
+  private static final String DEFAULT_OUTPUT_CHECKSUM = 
"ebf895e7324e8a3edc72e7bcc96fa2ba7f690def";
+
+  /**
+   * Options for the WordCount Integration Test.
+   *
+   * <p>Define expected output file checksum to verify WordCount pipeline 
result with customized
+   * input.
+   */
+  public interface WordCountITOptions extends TestPipelineOptions, 
WordCountOptions {}
+
+  @BeforeClass
+  public static void setUp() {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+  }
+
+  @Test
+  public void testE2EWordCount() throws Exception {
+    WordCountITOptions options = 
TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
+
+    options.setInputFile(DEFAULT_INPUT);
+    options.setOutput(
+        FileSystems.matchNewResource(options.getTempRoot(), true)
+            .resolve(
+                String.format("WordCountITKotlin-%tF-%<tH-%<tM-%<tS-%<tL", new 
Date()),
+                StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("results", StandardResolveOptions.RESOLVE_FILE)
+            .toString());
+    options.setOnSuccessMatcher(
+        new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + 
"*-of-*"));
+
+    WordCount.runWordCount(options);
+  }
+}
diff --git a/examples/kotlin/src/test/resources/LICENSE 
b/examples/kotlin/src/test/resources/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/examples/kotlin/src/test/resources/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/settings.gradle b/settings.gradle
index b974ae1..2aa3188 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,6 +20,8 @@ rootProject.name = "beam"
 
 include ":release"
 
+include "beam-examples-kotlin"
+project(":beam-examples-kotlin").dir = file("examples/kotlin")
 include "beam-examples-java"
 project(":beam-examples-java").dir = file("examples/java")
 include "beam-model-fn-execution"

Reply via email to