Rearranging the code and renaming certain classes.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc4c60eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc4c60eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc4c60eb

Branch: refs/heads/master
Commit: bc4c60ebfa49ad050367533809c265375e8c0b01
Parents: 37a9b29
Author: kl0u <[email protected]>
Authored: Mon Feb 29 12:38:56 2016 +0100
Committer: Davor Bonaci <[email protected]>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../dataflow/FlinkJobExecutionEnvironment.java  | 237 -----------------
 .../FlinkPipelineExecutionEnvironment.java      | 255 +++++++++++++++++++
 .../flink/dataflow/FlinkPipelineOptions.java    |  23 +-
 .../flink/dataflow/FlinkPipelineRunner.java     |   6 +-
 .../examples/streaming/AutoComplete.java        |   7 +-
 .../examples/streaming/JoinExamples.java        |   5 +-
 .../KafkaWindowedWordCountExample.java          |   3 +
 .../examples/streaming/WindowedWordCount.java   |   3 +
 .../FlinkStreamingTransformTranslators.java     |   2 -
 .../FlinkStreamingTranslationContext.java       |   9 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |  10 +-
 .../FlinkGroupAlsoByWindowWrapper.java          |   2 -
 .../io/FlinkStreamingCreateFunction.java        |   7 +-
 .../streaming/io/UnboundedSourceWrapper.java    |  11 +-
 .../dataartisans/flink/dataflow/AvroITCase.java |   4 +-
 .../flink/dataflow/FlattenizeITCase.java        |   2 +-
 .../flink/dataflow/FlinkTestPipeline.java       |  14 +-
 .../flink/dataflow/JoinExamplesITCase.java      |   2 +-
 .../flink/dataflow/MaybeEmptyTestITCase.java    |   2 +-
 .../flink/dataflow/ParDoMultiOutputITCase.java  |   2 +-
 .../flink/dataflow/ReadSourceITCase.java        |   2 +-
 .../dataflow/RemoveDuplicatesEmptyITCase.java   |   2 +-
 .../flink/dataflow/RemoveDuplicatesITCase.java  |   2 +-
 .../flink/dataflow/SideInputITCase.java         |   2 +-
 .../flink/dataflow/TfIdfITCase.java             |   2 +-
 .../dataflow/TopWikipediaSessionsITCase.java    | 144 -----------
 .../flink/dataflow/WordCountITCase.java         |   2 +-
 .../flink/dataflow/WordCountJoin2ITCase.java    |   2 +-
 .../flink/dataflow/WordCountJoin3ITCase.java    |   2 +-
 .../flink/dataflow/WriteSinkITCase.java         |   2 +-
 .../streaming/GroupAlsoByWindowTest.java        |   8 +-
 .../streaming/TopWikipediaSessionsITCase.java   | 145 +++++++++++
 32 files changed, 481 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
deleted file mode 100644
index 91b2f64..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.dataartisans.flink.dataflow;
-
-import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
-import 
com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
-import 
com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class FlinkJobExecutionEnvironment {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class);
-
-       private final FlinkPipelineOptions options;
-
-       /**
-        * The Flink Batch execution environment. This is instantiated to 
either a
-        * {@link org.apache.flink.api.java.CollectionEnvironment},
-        * a {@link org.apache.flink.api.java.LocalEnvironment} or
-        * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on 
the configuration
-        * options.
-        */
-       private ExecutionEnvironment flinkBatchEnv;
-
-
-       /**
-        * The Flink Streaming execution environment. This is instantiated to 
either a
-        * {@link 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
-        * a {@link 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
-        * on the configuration options, and more specifically, the url of the 
master url.
-        */
-       private StreamExecutionEnvironment flinkStreamEnv;
-
-       /**
-        * Translator for this FlinkPipelineRunner. Its role is to translate 
the Dataflow operators to
-        * their Flink based counterparts. Based on the options provided by the 
user, if we have a streaming job,
-        * this is instantiated to a FlinkStreamingPipelineTranslator. In other 
case, i.e. a batch job,
-        * a FlinkBatchPipelineTranslator is created.
-        */
-       private FlinkPipelineTranslator flinkPipelineTranslator;
-
-       public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) {
-               if (options == null) {
-                       throw new IllegalArgumentException("Options in the 
FlinkJobExecutionEnvironment cannot be NULL.");
-               }
-               this.options = options;
-               this.createJobEnvironment();
-               this.createJobGraphTranslator();
-       }
-
-       /**
-        * Depending on the type of job (Streaming or Batch) and the 
user-specified options,
-        * this method creates the adequate ExecutionEnvironment.
-        */
-       private void createJobEnvironment() {
-               if (options.isStreaming()) {
-                       LOG.info("Creating the required STREAMING 
Environment.");
-                       createStreamExecutionEnvironment();
-               } else {
-                       LOG.info("Creating the required BATCH Environment.");
-                       createBatchExecutionEnvironment();
-               }
-       }
-
-       /**
-        * Depending on the type of job (Streaming or Batch), this method 
creates the adequate job graph
-        * translator. In the case of batch, it will work with DataSets, while 
for streaming, it will work
-        * with DataStreams.
-        */
-       private void createJobGraphTranslator() {
-               checkInitializationState();
-               if (this.flinkPipelineTranslator != null) {
-                       throw new IllegalStateException("JobGraphTranslator 
already initialized.");
-               }
-
-               this.flinkPipelineTranslator = options.isStreaming() ?
-                               new 
FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
-                               new FlinkBatchPipelineTranslator(flinkBatchEnv, 
options);
-       }
-
-       public void translate(Pipeline pipeline) {
-               checkInitializationState();
-               if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
-                       createJobEnvironment();
-               }
-               if (this.flinkPipelineTranslator == null) {
-                       createJobGraphTranslator();
-               }
-               this.flinkPipelineTranslator.translate(pipeline);
-       }
-
-       public JobExecutionResult executeJob() throws Exception {
-               if (options.isStreaming()) {
-
-                       System.out.println("Plan: " + 
this.flinkStreamEnv.getExecutionPlan());
-
-                       if (this.flinkStreamEnv == null) {
-                               throw new 
RuntimeException("JobExecutionEnvironment not initialized.");
-                       }
-                       if (this.flinkPipelineTranslator == null) {
-                               throw new RuntimeException("JobGraphTranslator 
not initialized.");
-                       }
-                       return this.flinkStreamEnv.execute();
-               } else {
-                       if (this.flinkBatchEnv == null) {
-                               throw new 
RuntimeException("JobExecutionEnvironment not initialized.");
-                       }
-                       if (this.flinkPipelineTranslator == null) {
-                               throw new RuntimeException("JobGraphTranslator 
not initialized.");
-                       }
-                       return this.flinkBatchEnv.execute();
-               }
-       }
-
-       /**
-        * If the submitted job is a batch processing job, this method creates 
the adequate
-        * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} 
depending
-        * on the user-specified options.
-        */
-       private void createBatchExecutionEnvironment() {
-               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-                       throw new RuntimeException("JobExecutionEnvironment 
already initialized.");
-               }
-
-               String masterUrl = options.getFlinkMaster();
-               this.flinkStreamEnv = null;
-
-               // depending on the master, create the right environment.
-               if (masterUrl.equals("[local]")) {
-                       this.flinkBatchEnv = 
ExecutionEnvironment.createLocalEnvironment();
-               } else if (masterUrl.equals("[collection]")) {
-                       this.flinkBatchEnv = new CollectionEnvironment();
-               } else if (masterUrl.equals("[auto]")) {
-                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
-               } else if (masterUrl.matches(".*:\\d*")) {
-                       String[] parts = masterUrl.split(":");
-                       List<String> stagingFiles = options.getFilesToStage();
-                       this.flinkBatchEnv = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
-                                       Integer.parseInt(parts[1]),
-                                       stagingFiles.toArray(new 
String[stagingFiles.size()]));
-               } else {
-                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
-                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
-               }
-
-               // set the correct parallelism.
-               if (options.getParallelism() != -1 && !(this.flinkBatchEnv 
instanceof CollectionEnvironment)) {
-                       
this.flinkBatchEnv.setParallelism(options.getParallelism());
-               }
-
-               // set parallelism in the options (required by some execution 
code)
-               options.setParallelism(flinkBatchEnv.getParallelism());
-       }
-
-       /**
-        * If the submitted job is a stream processing job, this method creates 
the adequate
-        * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
-        * on the user-specified options.
-        */
-       private void createStreamExecutionEnvironment() {
-               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-                       throw new RuntimeException("JobExecutionEnvironment 
already initialized.");
-               }
-
-               String masterUrl = options.getFlinkMaster();
-               this.flinkBatchEnv = null;
-
-               // depending on the master, create the right environment.
-               if (masterUrl.equals("[local]")) {
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
-               } else if (masterUrl.equals("[auto]")) {
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               } else if (masterUrl.matches(".*:\\d*")) {
-                       String[] parts = masterUrl.split(":");
-                       List<String> stagingFiles = options.getFilesToStage();
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
-                                       Integer.parseInt(parts[1]), 
stagingFiles.toArray(new String[stagingFiles.size()]));
-               } else {
-                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
-                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               }
-
-               // set the correct parallelism.
-               if (options.getParallelism() != -1) {
-                       
this.flinkStreamEnv.setParallelism(options.getParallelism());
-               }
-
-               // set parallelism in the options (required by some execution 
code)
-               options.setParallelism(flinkStreamEnv.getParallelism());
-
-               // although we do not use the generated timestamps,
-               // enabling timestamps is needed for the watermarks.
-               this.flinkStreamEnv.getConfig().enableTimestamps();
-
-               
this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-               this.flinkStreamEnv.enableCheckpointing(1000);
-               this.flinkStreamEnv.setNumberOfExecutionRetries(5);
-
-               LOG.info("Setting execution retry delay to 3 sec");
-               this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000);
-       }
-
-       private void checkInitializationState() {
-               if (this.options == null) {
-                       throw new 
IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet.");
-               }
-
-               if (options.isStreaming() && this.flinkBatchEnv != null) {
-                       throw new IllegalStateException("Attempted to run a 
Streaming Job with a Batch Execution Environment.");
-               } else if (!options.isStreaming() && this.flinkStreamEnv != 
null) {
-                       throw new IllegalStateException("Attempted to run a 
Batch Job with a Streaming Execution Environment.");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..a1372bd
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow;
+
+import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator;
+import 
com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator;
+import 
com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class FlinkPipelineExecutionEnvironment {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+       private final FlinkPipelineOptions options;
+
+       /**
+        * The Flink Batch execution environment. This is instantiated to 
either a
+        * {@link org.apache.flink.api.java.CollectionEnvironment},
+        * a {@link org.apache.flink.api.java.LocalEnvironment} or
+        * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on 
the configuration
+        * options.
+        */
+       private ExecutionEnvironment flinkBatchEnv;
+
+
+       /**
+        * The Flink Streaming execution environment. This is instantiated to 
either a
+        * {@link 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
+        * a {@link 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+        * on the configuration options, and more specifically, the url of the 
master.
+        */
+       private StreamExecutionEnvironment flinkStreamEnv;
+
+       /**
+        * Translator for this FlinkPipelineRunner. Its role is to translate 
the Beam operators to
+        * their Flink counterparts. Based on the options provided by the user, 
if we have a streaming job,
+        * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. 
In other case, i.e. a batch job,
+        * a {@link FlinkBatchPipelineTranslator} is created.
+        */
+       private FlinkPipelineTranslator flinkPipelineTranslator;
+
+       /**
+        * Creates a {@link FlinkPipelineExecutionEnvironment} with the 
user-specified parameters in the
+        * provided {@link FlinkPipelineOptions}.
+        *
+        * @param options the user-defined pipeline options.
+        * */
+       public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+               this.options = Preconditions.checkNotNull(options);
+               this.createPipelineExecutionEnvironment();
+               this.createPipelineTranslator();
+       }
+
+       /**
+        * Depending on the type of job (Streaming or Batch) and the 
user-specified options,
+        * this method creates the adequate ExecutionEnvironment.
+        */
+       private void createPipelineExecutionEnvironment() {
+               if (options.isStreaming()) {
+                       createStreamExecutionEnvironment();
+               } else {
+                       createBatchExecutionEnvironment();
+               }
+       }
+
+       /**
+        * Depending on the type of job (Streaming or Batch), this method 
creates the adequate job graph
+        * translator. In the case of batch, it will work with {@link 
org.apache.flink.api.java.DataSet},
+        * while for streaming, it will work with {@link 
org.apache.flink.streaming.api.datastream.DataStream}.
+        */
+       private void createPipelineTranslator() {
+               checkInitializationState();
+               if (this.flinkPipelineTranslator != null) {
+                       throw new 
IllegalStateException("FlinkPipelineTranslator already initialized.");
+               }
+
+               this.flinkPipelineTranslator = options.isStreaming() ?
+                               new 
FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+                               new FlinkBatchPipelineTranslator(flinkBatchEnv, 
options);
+       }
+
+       /**
+        * Depending on if the job is a Streaming or a Batch one, this method 
creates
+        * the necessary execution environment and pipeline translator, and 
translates
+        * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program 
into
+        * a {@link org.apache.flink.api.java.DataSet} or {@link 
org.apache.flink.streaming.api.datastream.DataStream}
+        * one.
+        * */
+       public void translate(Pipeline pipeline) {
+               checkInitializationState();
+               if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+                       createPipelineExecutionEnvironment();
+               }
+               if (this.flinkPipelineTranslator == null) {
+                       createPipelineTranslator();
+               }
+               this.flinkPipelineTranslator.translate(pipeline);
+       }
+
+       /**
+        * Launches the program execution.
+        * */
+       public JobExecutionResult executePipeline() throws Exception {
+               if (options.isStreaming()) {
+                       if (this.flinkStreamEnv == null) {
+                               throw new 
RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+                       }
+                       if (this.flinkPipelineTranslator == null) {
+                               throw new 
RuntimeException("FlinkPipelineTranslator not initialized.");
+                       }
+                       return this.flinkStreamEnv.execute();
+               } else {
+                       if (this.flinkBatchEnv == null) {
+                               throw new 
RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
+                       }
+                       if (this.flinkPipelineTranslator == null) {
+                               throw new 
RuntimeException("FlinkPipelineTranslator not initialized.");
+                       }
+                       return this.flinkBatchEnv.execute();
+               }
+       }
+
+       /**
+        * If the submitted job is a batch processing job, this method creates 
the adequate
+        * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} 
depending
+        * on the user-specified options.
+        */
+       private void createBatchExecutionEnvironment() {
+               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+                       throw new 
RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+               }
+
+               LOG.info("Creating the required Batch Execution Environment.");
+
+               String masterUrl = options.getFlinkMaster();
+               this.flinkStreamEnv = null;
+
+               // depending on the master, create the right environment.
+               if (masterUrl.equals("[local]")) {
+                       this.flinkBatchEnv = 
ExecutionEnvironment.createLocalEnvironment();
+               } else if (masterUrl.equals("[collection]")) {
+                       this.flinkBatchEnv = new CollectionEnvironment();
+               } else if (masterUrl.equals("[auto]")) {
+                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
+               } else if (masterUrl.matches(".*:\\d*")) {
+                       String[] parts = masterUrl.split(":");
+                       List<String> stagingFiles = options.getFilesToStage();
+                       this.flinkBatchEnv = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
+                                       Integer.parseInt(parts[1]),
+                                       stagingFiles.toArray(new 
String[stagingFiles.size()]));
+               } else {
+                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
+                       this.flinkBatchEnv = 
ExecutionEnvironment.getExecutionEnvironment();
+               }
+
+               // set the correct parallelism.
+               if (options.getParallelism() != -1 && !(this.flinkBatchEnv 
instanceof CollectionEnvironment)) {
+                       
this.flinkBatchEnv.setParallelism(options.getParallelism());
+               }
+
+               // set parallelism in the options (required by some execution 
code)
+               options.setParallelism(flinkBatchEnv.getParallelism());
+       }
+
+       /**
+        * If the submitted job is a stream processing job, this method creates 
the adequate
+        * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+        * on the user-specified options.
+        */
+       private void createStreamExecutionEnvironment() {
+               if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+                       throw new 
RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
+               }
+
+               LOG.info("Creating the required Streaming Environment.");
+
+               String masterUrl = options.getFlinkMaster();
+               this.flinkBatchEnv = null;
+
+               // depending on the master, create the right environment.
+               if (masterUrl.equals("[local]")) {
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
+               } else if (masterUrl.equals("[auto]")) {
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               } else if (masterUrl.matches(".*:\\d*")) {
+                       String[] parts = masterUrl.split(":");
+                       List<String> stagingFiles = options.getFilesToStage();
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+                                       Integer.parseInt(parts[1]), 
stagingFiles.toArray(new String[stagingFiles.size()]));
+               } else {
+                       LOG.warn("Unrecognized Flink Master URL {}. Defaulting 
to [auto].", masterUrl);
+                       this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               }
+
+               // set the correct parallelism.
+               if (options.getParallelism() != -1) {
+                       
this.flinkStreamEnv.setParallelism(options.getParallelism());
+               }
+
+               // set parallelism in the options (required by some execution 
code)
+               options.setParallelism(flinkStreamEnv.getParallelism());
+
+               // although we do not use the generated timestamps,
+               // enabling timestamps is needed for the watermarks.
+               this.flinkStreamEnv.getConfig().enableTimestamps();
+               
this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               // for the following 2 parameters, a value of -1 means that 
Flink will use
+               // the default values as specified in the configuration.
+               
this.flinkStreamEnv.setNumberOfExecutionRetries(options.getNumberOfExecutionRetries());
+               
this.flinkStreamEnv.getConfig().setExecutionRetryDelay(options.getExecutionRetryDelay());
+
+               // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
+               // If the value is not -1, then the validity checks are applied.
+               // By default, checkpointing is disabled.
+               long checkpointInterval = options.getCheckpointingInterval();
+               if(checkpointInterval != -1) {
+                       if (checkpointInterval < 1) {
+                               throw new IllegalArgumentException("The 
checkpoint interval must be positive");
+                       }
+                       
this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+               }
+       }
+
+       private void checkInitializationState() {
+               if (options.isStreaming() && this.flinkBatchEnv != null) {
+                       throw new IllegalStateException("Attempted to run a 
Streaming Job with a Batch Execution Environment.");
+               } else if (!options.isStreaming() && this.flinkStreamEnv != 
null) {
+                       throw new IllegalStateException("Attempted to run a 
Batch Job with a Streaming Execution Environment.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
index e746f41..2429cac 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java
@@ -66,11 +66,26 @@ public interface FlinkPipelineOptions extends 
PipelineOptions, ApplicationNameOp
        String getFlinkMaster();
        void setFlinkMaster(String value);
 
-       /**
-        * The degree of parallelism to be used when parallelizing operations 
onto workers.
-        */
-       @Description("The degree of parallelism to be used when parallelizing 
operations onto workers.")
+       @Description("The degree of parallelism to be used when distributing 
operations onto workers.")
        @Default.Integer(-1)
        Integer getParallelism();
        void setParallelism(Integer value);
+
+       @Description("The interval between consecutive checkpoints (i.e. 
snapshots of the current pipeline state used for " +
+                       "fault tolerance).")
+       @Default.Long(-1L)
+       Long getCheckpointingInterval();
+       void setCheckpointingInterval(Long interval);
+
+       @Description("Sets the number of times that failed tasks are 
re-executed. " +
+                       "A value of zero effectively disables fault tolerance. 
A value of -1 indicates " +
+                       "that the system default value (as defined in the 
configuration) should be used.")
+       @Default.Integer(-1)
+       Integer getNumberOfExecutionRetries();
+       void setNumberOfExecutionRetries(Integer retries);
+
+       @Description("Sets the delay between executions. A value of {@code -1} 
indicates that the default value should be used.")
+       @Default.Long(-1L)
+       Long getExecutionRetryDelay();
+       void setExecutionRetryDelay(Long delay);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
index ebd2691..7ea8370 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java
@@ -54,7 +54,7 @@ public class FlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
         */
        private final FlinkPipelineOptions options;
 
-       private final FlinkJobExecutionEnvironment flinkJobEnv;
+       private final FlinkPipelineExecutionEnvironment flinkJobEnv;
 
        /**
         * Construct a runner from the provided options.
@@ -103,7 +103,7 @@ public class FlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
 
        private FlinkPipelineRunner(FlinkPipelineOptions options) {
                this.options = options;
-               this.flinkJobEnv = new FlinkJobExecutionEnvironment(options);
+               this.flinkJobEnv = new 
FlinkPipelineExecutionEnvironment(options);
        }
 
        @Override
@@ -118,7 +118,7 @@ public class FlinkPipelineRunner extends 
PipelineRunner<FlinkRunnerResult> {
                
                JobExecutionResult result;
                try {
-                       result = this.flinkJobEnv.executeJob();
+                       result = this.flinkJobEnv.executePipeline();
                } catch (Exception e) {
                        LOG.error("Pipeline execution failed", e);
                        throw new RuntimeException("Pipeline execution failed", 
e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
index 711d9fb..493fb25 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java
@@ -325,7 +325,9 @@ public class AutoComplete {
    * Takes as input a the top candidates per prefix, and emits an entity
    * suitable for writing to Datastore.
    */
-  static class FormatForPerTaskLocalFile extends DoFn<KV<String, 
List<CompletionCandidate>>, String> {
+  static class FormatForPerTaskLocalFile extends DoFn<KV<String, 
List<CompletionCandidate>>, String>
+          implements DoFn.RequiresWindowAccess{
+
     private static final long serialVersionUID = 0;
 
     @Override
@@ -357,6 +359,9 @@ public class AutoComplete {
   public static void main(String[] args) throws IOException {
     Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     options.setStreaming(true);
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkPipelineRunner.class);
 
     PTransform<? super PBegin, PCollection<String>> readSource =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
index 9a5db64..60f6788 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java
@@ -124,9 +124,10 @@ public class JoinExamples {
 
        public static void main(String[] args) throws Exception {
                Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-               // make it a streaming example.
                options.setStreaming(true);
+               options.setCheckpointingInterval(1000L);
+               options.setNumberOfExecutionRetries(5);
+               options.setExecutionRetryDelay(3000L);
                options.setRunner(FlinkPipelineRunner.class);
 
                PTransform<? super PBegin, PCollection<String>> readSourceA =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
index 42d3d88..dba2721 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java
@@ -104,6 +104,9 @@ public class KafkaWindowedWordCountExample {
                KafkaStreamingWordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class);
                options.setJobName("KafkaExample");
                options.setStreaming(true);
+               options.setCheckpointingInterval(1000L);
+               options.setNumberOfExecutionRetries(5);
+               options.setExecutionRetryDelay(3000L);
                options.setRunner(FlinkPipelineRunner.class);
 
                System.out.println(options.getKafkaTopic() +" "+ 
options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() );

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
index b539245..37dc39a 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java
@@ -99,6 +99,9 @@ public class WindowedWordCount {
                options.setStreaming(true);
                options.setWindowSize(10L);
                options.setSlide(5L);
+               options.setCheckpointingInterval(1000L);
+               options.setNumberOfExecutionRetries(5);
+               options.setExecutionRetryDelay(3000L);
                options.setRunner(FlinkPipelineRunner.class);
 
                LOG.info("Windpwed WordCount with Sliding Windows of " + 
options.getWindowSize() +

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
index 46d3e36..27cc923 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
@@ -69,7 +69,6 @@ public class FlinkStreamingTransformTranslators {
 
        // here you can find all the available translators.
        static {
-
                TRANSLATORS.put(Create.Values.class, new 
CreateStreamingTranslator());
                TRANSLATORS.put(Read.Unbounded.class, new 
UnboundedReadSourceTranslator());
                TRANSLATORS.put(ParDo.Bound.class, new 
ParDoBoundStreamingTranslator());
@@ -79,7 +78,6 @@ public class FlinkStreamingTransformTranslators {
                TRANSLATORS.put(Combine.PerKey.class, new 
CombinePerKeyTranslator());
                TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslator());
                TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiStreamingTranslator());
-
        }
 
        public static 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
index df68e50..7c4ab93 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
@@ -18,7 +18,10 @@ package com.dataartisans.flink.dataflow.translation;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.*;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.base.Preconditions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
@@ -40,8 +43,8 @@ public class FlinkStreamingTranslationContext {
        private AppliedPTransform<?, ?, ?> currentTransform;
 
        public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, 
PipelineOptions options) {
-               this.env = env;
-               this.options = options;
+               this.env = Preconditions.checkNotNull(env);
+               this.options = Preconditions.checkNotNull(options);
                this.dataStreams = new HashMap<>();
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 71f9c7f..dfb2b7d 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -116,10 +116,10 @@ public abstract class FlinkAbstractParDoWrapper<IN, 
OUTDF, OUTFL> extends RichFl
 
                @Override
                public BoundedWindow window() {
-//                     if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-//                             throw new UnsupportedOperationException(
-//                                             "window() is only available in 
the context of a DoFn marked as RequiresWindow.");
-//                     }
+                       if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+                               throw new UnsupportedOperationException(
+                                               "window() is only available in 
the context of a DoFn marked as RequiresWindow.");
+                       }
 
                        Collection<? extends BoundedWindow> windows = 
this.element.getWindows();
                        if (windows.size() != 1) {
@@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, 
OUTFL> extends RichFl
                                        @Override
                                        public Object element() {
                                                throw new 
UnsupportedOperationException(
-                                                               "WindowFn 
attempted to access input element when none was available"); // TODO: 12/16/15 
aljoscha's comment in slack
+                                                               "WindowFn 
attempted to access input element when none was available");
                                        }
 
                                        @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 0f0a9d0..b78db65 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -238,9 +238,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
                                this.operator = 
StreamingGroupAlsoByWindowsDoFn.createForIterable(
                                                this.windowingStrategy, 
inputValueCoder);
                        } else {
-
                                Coder<K> inputKeyCoder = 
inputKvCoder.getKeyCoder();
-                               //CoderRegistry dataflowRegistry = 
input.getPipeline().getCoderRegistry();
 
                                AppliedCombineFn<K, VIN, VACC, VOUT> 
appliedCombineFn = AppliedCombineFn
                                                .withInputCoder(combineFn, 
coderRegistry, inputKvCoder);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
index b8824f5..c952d6f 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -22,6 +22,7 @@ import 
com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
 
 import java.io.ByteArrayInputStream;
 import java.util.List;
@@ -44,17 +45,15 @@ public class FlinkStreamingCreateFunction<IN, OUT> 
implements FlatMapFunction<IN
        public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws 
Exception {
 
                @SuppressWarnings("unchecked")
-               // TODO Flink doesn't allow null values in records
                OUT voidValue = (OUT) 
VoidCoderTypeSerializer.VoidValue.INSTANCE;
-
                for (byte[] element : elements) {
                        ByteArrayInputStream bai = new 
ByteArrayInputStream(element);
                        OUT outValue = coder.decode(bai, Coder.Context.OUTER);
 
                        if (outValue == null) {
-                               out.collect(WindowedValue.of(voidValue, 
GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+                               out.collect(WindowedValue.of(voidValue, 
Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
                        } else {
-                               out.collect(WindowedValue.of(outValue, 
GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+                               out.collect(WindowedValue.of(outValue, 
Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 3e248a6..cdc2e95 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -38,7 +38,7 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
        private final UnboundedSource.UnboundedReader<T> reader;
 
        private StreamingRuntimeContext runtime = null;
-       private StreamSource.ManualWatermarkContext<T> context = null;
+       private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = 
null;
 
        private volatile boolean isRunning = false;
 
@@ -51,8 +51,7 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
                return this.name;
        }
 
-       WindowedValue<T> makeWindowedValue(
-                       T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
+       WindowedValue<T> makeWindowedValue(T output, Instant timestamp) {
                if (timestamp == null) {
                        timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
                }
@@ -66,7 +65,7 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
                                        "Apparently " + this.name + " is not. 
Probably you should consider writing your own Wrapper for this source.");
                }
 
-               context = (StreamSource.ManualWatermarkContext<T>) ctx;
+               context = 
(StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
                runtime = (StreamingRuntimeContext) getRuntimeContext();
 
                this.isRunning = reader.start();
@@ -78,11 +77,9 @@ public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<
                        T item = reader.getCurrent();
                        Instant timestamp = reader.getCurrentTimestamp();
 
-                       long milliseconds = timestamp.getMillis();
-
                        // write it to the output collector
                        synchronized (ctx.getCheckpointLock()) {
-                               
ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, 
PaneInfo.NO_FIRING), milliseconds);
+                               
context.collectWithTimestamp(makeWindowedValue(item, timestamp), 
timestamp.getMillis());
                        }
 
                        // try to go to the next record

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
index c6e3e99..2b1f091 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
@@ -56,14 +56,14 @@ public class AvroITCase extends JavaProgramTestBase {
        }
 
        private static void runProgram(String tmpPath, String resultPath) {
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                p.apply(Create.of(new User("Joe", 3, "red"), new User("Mary", 
4, "blue")).withCoder(AvroCoder.of(User.class)))
                                
.apply(AvroIO.Write.to(tmpPath).withSchema(User.class));
 
                p.run();
 
-               p = FlinkTestPipeline.create();
+               p = FlinkTestPipeline.createForBatch();
 
                p.apply(AvroIO.Read.from(tmpPath).withSchema(User.class))
                                .apply(ParDo.of(new DoFn<User, String>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
index bc24514..928388c 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
@@ -51,7 +51,7 @@ public class FlattenizeITCase extends JavaProgramTestBase {
 
        @Override
        protected void testProgram() throws Exception {
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<String> p1 = p.apply(Create.of(words));
                PCollection<String> p2 = p.apply(Create.of(words2));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 109b1ff..56af3f1 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -32,7 +32,7 @@ public class FlinkTestPipeline extends Pipeline {
         * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
         * {@link Pipeline#run} to execute the pipeline and check the tests.
         */
-       public static FlinkTestPipeline create() {
+       public static FlinkTestPipeline createForBatch() {
                return create(false);
        }
 
@@ -44,7 +44,7 @@ public class FlinkTestPipeline extends Pipeline {
         *
         * @return The Test Pipeline
         */
-       public static FlinkTestPipeline createStreaming() {
+       public static FlinkTestPipeline createForStreaming() {
                return create(true);
        }
 
@@ -54,18 +54,18 @@ public class FlinkTestPipeline extends Pipeline {
         * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
         * {@link Pipeline#run} to execute the pipeline and check the tests.
         *
-        * @param streaming True for streaming mode, False for batch
-        * @return The Test Pipeline
+        * @param streaming <code>True</code> for streaming mode, 
<code>False</code> for batch.
+        * @return The Test Pipeline.
         */
-       public static FlinkTestPipeline create(boolean streaming) {
+       private static FlinkTestPipeline create(boolean streaming) {
                FlinkPipelineRunner flinkRunner = 
FlinkPipelineRunner.createForTest(streaming);
                FlinkPipelineOptions pipelineOptions = 
flinkRunner.getPipelineOptions();
                pipelineOptions.setStreaming(streaming);
                return new FlinkTestPipeline(flinkRunner, pipelineOptions);
        }
 
-       private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> 
runner, PipelineOptions
-                       options) {
+       private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> 
runner,
+                                                       PipelineOptions 
options) {
                super(runner, options);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index ed2ecf5..af0f217 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -84,7 +84,7 @@ public class JoinExamplesITCase extends JavaProgramTestBase {
        @Override
        protected void testProgram() throws Exception {
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
                PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
index 29c34d4..35f2eaf 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
@@ -47,7 +47,7 @@ public class MaybeEmptyTestITCase extends JavaProgramTestBase 
implements Seriali
        @Override
        protected void testProgram() throws Exception {
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
                                .apply(ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
index dbe88d2..ccdbbf9 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
@@ -47,7 +47,7 @@ public class ParDoMultiOutputITCase extends 
JavaProgramTestBase implements Seria
 
        @Override
        protected void testProgram() throws Exception {
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<String> words = p.apply(Create.of("Hello", 
"Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
index ba675b1..39f54e4 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
@@ -61,7 +61,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
 
        private static void runProgram(String resultPath) {
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<String> result = p
                                .apply(Read.from(new ReadSource(1, 10)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
index ff59db7..db794f7 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
@@ -52,7 +52,7 @@ public class RemoveDuplicatesEmptyITCase extends 
JavaProgramTestBase {
 
                List<String> strings = Collections.emptyList();
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<String> input =
                                p.apply(Create.of(strings))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
index a8200aa..04e06b8 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
@@ -53,7 +53,7 @@ public class RemoveDuplicatesITCase extends 
JavaProgramTestBase {
 
                List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", 
"k1", "k2", "k3");
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<String> input =
                                p.apply(Create.of(strings))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
index d932c80..ee8843c 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
@@ -36,7 +36,7 @@ public class SideInputITCase extends JavaProgramTestBase 
implements Serializable
        protected void testProgram() throws Exception {
 
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
 
                final PCollectionView<String> sidesInput = p

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
index e801ac4..1b4afb3 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
@@ -53,7 +53,7 @@ public class TfIdfITCase extends JavaProgramTestBase {
        @Override
        protected void testProgram() throws Exception {
 
-               Pipeline pipeline = FlinkTestPipeline.create();
+               Pipeline pipeline = FlinkTestPipeline.createForBatch();
 
                pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
deleted file mode 100644
index eb020c5..0000000
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.dataartisans.flink.dataflow;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-
-/**
- * Session window test
- */
-public class TopWikipediaSessionsITCase extends StreamingProgramTestBase 
implements Serializable {
-       protected String resultPath;
-
-       public TopWikipediaSessionsITCase(){
-       }
-
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "user: user1 value:3",
-                       "user: user1 value:1",
-                       "user: user2 value:4",
-                       "user: user2 value:6",
-                       "user: user3 value:7",
-                       "user: user3 value:2"
-       };
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-
-               Pipeline p = FlinkTestPipeline.createStreaming();
-
-               long now = System.currentTimeMillis() + 10000;
-               System.out.println((now + 5000) / 1000);
-
-               PCollection<KV<String, Long>> output =
-                       p.apply(Create.of(Arrays.asList(new 
TableRow().set("timestamp", now).set
-                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 10).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now).set
-                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 2).set
-                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 1).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 5).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 7).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 8).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 200).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 230).set
-                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 230).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 240).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 245).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 235).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 236).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 237).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 238).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 239).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 240).set
-                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 241).set
-                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now)
-                                       .set("contributor_username", "user3"))))
-
-
-
-                       .apply(ParDo.of(new DoFn<TableRow, String>() {
-                               @Override
-                               public void processElement(ProcessContext c) 
throws Exception {
-                                       TableRow row = c.element();
-                                       long timestamp = (Long) 
row.get("timestamp");
-                                       String userName = (String) 
row.get("contributor_username");
-                                       if (userName != null) {
-                                               // Sets the timestamp field to 
be used in windowing.
-                                               c.outputWithTimestamp(userName, 
new Instant(timestamp * 1000L));
-                                       }
-                               }
-                       }))
-
-                       .apply(ParDo.named("SampleUsers").of(
-                                       new DoFn<String, String>() {
-                                               private static final long 
serialVersionUID = 0;
-
-                                               @Override
-                                               public void 
processElement(ProcessContext c) {
-                                                       if 
(Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
-                                                               
c.output(c.element());
-                                                       }
-                                               }
-                                       }))
-
-                                       
.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-                                       .apply(Count.<String>perElement());
-
-               PCollection<String> format = output.apply(ParDo.of(new 
DoFn<KV<String, Long>, String>() {
-                       @Override
-                       public void processElement(ProcessContext c) throws 
Exception {
-                               KV<String, Long> el = c.element();
-                               String out = "user: " + el.getKey() + " value:" 
+ el.getValue();
-                               System.out.println(out);
-                               c.output(out);
-                       }
-               }));
-
-               format.apply(TextIO.Write.to(resultPath));
-
-               p.run();
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
index 9427ab6..5ddd379 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
@@ -58,7 +58,7 @@ public class WordCountITCase extends JavaProgramTestBase {
        @Override
        protected void testProgram() throws Exception {
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                PCollection<String> input = 
p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
index c3eed61..ccc52c4 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
@@ -70,7 +70,7 @@ public class WordCountJoin2ITCase extends JavaProgramTestBase 
{
 
        @Override
        protected void testProgram() throws Exception {
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                /* Create two PCollections and join them */
                PCollection<KV<String,Long>> occurences1 = 
p.apply(Create.of(WORDS_1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
index 33e67cc..e6eddc0 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
@@ -80,7 +80,7 @@ public class WordCountJoin3ITCase extends JavaProgramTestBase 
{
        @Override
        protected void testProgram() throws Exception {
 
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                /* Create two PCollections and join them */
                PCollection<KV<String,Long>> occurences1 = 
p.apply(Create.of(WORDS_1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
index 205fe9b..865fc5f 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
@@ -63,7 +63,7 @@ public class WriteSinkITCase extends JavaProgramTestBase {
        }
 
        private static void runProgram(String resultPath) {
-               Pipeline p = FlinkTestPipeline.create();
+               Pipeline p = FlinkTestPipeline.createForBatch();
 
                
p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
                        .apply("CustomSink", Write.to(new 
MyCustomSink(resultPath)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index b667187..1f36ee7 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -86,7 +86,7 @@ public class GroupAlsoByWindowTest {
                                
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
                                .withAllowedLateness(Duration.millis(1000));
                long initialTime = 0L;
-               Pipeline pipeline = FlinkTestPipeline.create();
+               Pipeline pipeline = FlinkTestPipeline.createForStreaming();
 
                KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
@@ -145,7 +145,7 @@ public class GroupAlsoByWindowTest {
                WindowingStrategy strategy = sessionWindowingStrategy;
 
                long initialTime = 0L;
-               Pipeline pipeline = FlinkTestPipeline.create();
+               Pipeline pipeline = FlinkTestPipeline.createForStreaming();
 
                KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
@@ -382,7 +382,7 @@ public class GroupAlsoByWindowTest {
        }
 
        private OneInputStreamOperatorTestHarness 
createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) 
throws Exception {
-               Pipeline pipeline = FlinkTestPipeline.create();
+               Pipeline pipeline = FlinkTestPipeline.createForStreaming();
 
                KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
@@ -478,7 +478,7 @@ public class GroupAlsoByWindowTest {
                                        @Override
                                        public Object element() {
                                                throw new 
UnsupportedOperationException(
-                                                               "WindowFn 
attempted to access input element when none was available"); // TODO: 12/16/15 
aljoscha's comment in slack
+                                                               "WindowFn 
attempted to access input element when none was available");
                                        }
 
                                        @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
new file mode 100644
index 0000000..1c800fa
--- /dev/null
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+
+/**
+ * Session window test
+ */
+public class TopWikipediaSessionsITCase extends StreamingProgramTestBase 
implements Serializable {
+       protected String resultPath;
+
+       public TopWikipediaSessionsITCase(){
+       }
+
+       static final String[] EXPECTED_RESULT = new String[] {
+                       "user: user1 value:3",
+                       "user: user1 value:1",
+                       "user: user2 value:4",
+                       "user: user2 value:6",
+                       "user: user3 value:7",
+                       "user: user3 value:2"
+       };
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+
+               Pipeline p = FlinkTestPipeline.createForStreaming();
+
+               long now = System.currentTimeMillis() + 10000;
+               System.out.println((now + 5000) / 1000);
+
+               PCollection<KV<String, Long>> output =
+                       p.apply(Create.of(Arrays.asList(new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 10).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 2).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 1).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 5).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 7).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 8).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 200).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 230).set
+                                       ("contributor_username", "user1"), new 
TableRow().set("timestamp", now + 230).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 240).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now + 245).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 235).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 236).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 237).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 238).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 239).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 240).set
+                                       ("contributor_username", "user3"), new 
TableRow().set("timestamp", now + 241).set
+                                       ("contributor_username", "user2"), new 
TableRow().set("timestamp", now)
+                                       .set("contributor_username", "user3"))))
+
+
+
+                       .apply(ParDo.of(new DoFn<TableRow, String>() {
+                               @Override
+                               public void processElement(ProcessContext c) 
throws Exception {
+                                       TableRow row = c.element();
+                                       long timestamp = (Long) 
row.get("timestamp");
+                                       String userName = (String) 
row.get("contributor_username");
+                                       if (userName != null) {
+                                               // Sets the timestamp field to 
be used in windowing.
+                                               c.outputWithTimestamp(userName, 
new Instant(timestamp * 1000L));
+                                       }
+                               }
+                       }))
+
+                       .apply(ParDo.named("SampleUsers").of(
+                                       new DoFn<String, String>() {
+                                               private static final long 
serialVersionUID = 0;
+
+                                               @Override
+                                               public void 
processElement(ProcessContext c) {
+                                                       if 
(Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) {
+                                                               
c.output(c.element());
+                                                       }
+                                               }
+                                       }))
+
+                                       
.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
+                                       .apply(Count.<String>perElement());
+
+               PCollection<String> format = output.apply(ParDo.of(new 
DoFn<KV<String, Long>, String>() {
+                       @Override
+                       public void processElement(ProcessContext c) throws 
Exception {
+                               KV<String, Long> el = c.element();
+                               String out = "user: " + el.getKey() + " value:" 
+ el.getValue();
+                               System.out.println(out);
+                               c.output(out);
+                       }
+               }));
+
+               format.apply(TextIO.Write.to(resultPath));
+
+               p.run();
+       }
+}

Reply via email to