http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
new file mode 100644
index 0000000..e39b81d
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.VoidCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.Serializable;
+
+public class MaybeEmptyTestITCase extends JavaProgramTestBase implements 
Serializable {
+
+  protected String resultPath;
+
+  protected final String expected = "test";
+
+  public MaybeEmptyTestITCase() {
+  }
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(expected, resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
+        .apply(ParDo.of(
+            new DoFn<Void, String>() {
+              @Override
+              public void processElement(DoFn<Void, String>.ProcessContext c) {
+                c.output(expected);
+              }
+            })).apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
new file mode 100644
index 0000000..08e5323
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.cloud.dataflow.sdk.values.TupleTagList;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.Serializable;
+
+public class ParDoMultiOutputITCase extends JavaProgramTestBase implements 
Serializable {
+
+  private String resultPath;
+
+  private static String[] expectedWords = {"MAAA", "MAAFOOO"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", 
"hey", "SPECIALthere", "MAAA", "MAAFOOO"));
+
+    // Select words whose length is below a cut off,
+    // plus the lengths of words that are above the cut off.
+    // Also select words starting with "MARKER".
+    final int wordLengthCutOff = 3;
+    // Create tags to use for the main and side outputs.
+    final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
+    final TupleTag<Integer> wordLengthsAboveCutOffTag = new 
TupleTag<Integer>(){};
+    final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
+
+    PCollectionTuple results =
+        words.apply(ParDo
+            .withOutputTags(wordsBelowCutOffTag, 
TupleTagList.of(wordLengthsAboveCutOffTag)
+                .and(markedWordsTag))
+            .of(new DoFn<String, String>() {
+              final TupleTag<String> specialWordsTag = new TupleTag<String>() {
+              };
+
+              public void processElement(ProcessContext c) {
+                String word = c.element();
+                if (word.length() <= wordLengthCutOff) {
+                  c.output(word);
+                } else {
+                  c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+                }
+                if (word.startsWith("MAA")) {
+                  c.sideOutput(markedWordsTag, word);
+                }
+
+                if (word.startsWith("SPECIAL")) {
+                  c.sideOutput(specialWordsTag, word);
+                }
+              }
+            }));
+
+    // Extract the PCollection results, by tag.
+    PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
+    PCollection<Integer> wordLengthsAboveCutOff = results.get
+        (wordLengthsAboveCutOffTag);
+    PCollection<String> markedWords = results.get(markedWordsTag);
+
+    markedWords.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
new file mode 100644
index 0000000..7202417
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ReadSourceITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public ReadSourceITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> result = p
+        .apply(Read.from(new ReadSource(1, 10)))
+        .apply(ParDo.of(new DoFn<Integer, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element().toString());
+          }
+        }));
+
+    result.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+
+
+  private static class ReadSource extends BoundedSource<Integer> {
+    final int from;
+    final int to;
+
+    ReadSource(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, 
PipelineOptions options)
+        throws Exception {
+      List<ReadSource> res = new ArrayList<>();
+      FlinkPipelineOptions flinkOptions = 
options.as(FlinkPipelineOptions.class);
+      int numWorkers = flinkOptions.getParallelism();
+      Preconditions.checkArgument(numWorkers > 0, "Number of workers should be 
larger than 0.");
+
+      float step = 1.0f * (to - from) / numWorkers;
+      for (int i = 0; i < numWorkers; ++i) {
+        res.add(new ReadSource(Math.round(from + i * step), Math.round(from + 
(i + 1) * step)));
+      }
+      return res;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      return 8 * (to - from);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws 
Exception {
+      return true;
+    }
+
+    @Override
+    public BoundedReader<Integer> createReader(PipelineOptions options) throws 
IOException {
+      return new RangeReader(this);
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<Integer> getDefaultOutputCoder() {
+      return BigEndianIntegerCoder.of();
+    }
+
+    private class RangeReader extends BoundedReader<Integer> {
+      private int current;
+
+      public RangeReader(ReadSource source) {
+        this.current = source.from - 1;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return true;
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        current++;
+        return (current < to);
+      }
+
+      @Override
+      public Integer getCurrent() {
+        return current;
+      }
+
+      @Override
+      public void close() throws IOException {
+        // Nothing
+      }
+
+      @Override
+      public BoundedSource<Integer> getCurrentSource() {
+        return ReadSource.this;
+      }
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
new file mode 100644
index 0000000..dc82d7d
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.Collections;
+import java.util.List;
+
+
+public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public RemoveDuplicatesEmptyITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    List<String> strings = Collections.emptyList();
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> input =
+        p.apply(Create.of(strings))
+            .setCoder(StringUtf8Coder.of());
+
+    PCollection<String> output =
+        input.apply(RemoveDuplicates.<String>create());
+
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
new file mode 100644
index 0000000..78b48b5
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+public class RemoveDuplicatesITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public RemoveDuplicatesITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "k1", "k5", "k2", "k3"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", 
"k3");
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> input =
+        p.apply(Create.of(strings))
+            .setCoder(StringUtf8Coder.of());
+
+    PCollection<String> output =
+        input.apply(RemoveDuplicates.<String>create());
+
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
new file mode 100644
index 0000000..5cd7d78
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.Serializable;
+
+public class SideInputITCase extends JavaProgramTestBase implements 
Serializable {
+
+  private static final String expected = "Hello!";
+
+  protected String resultPath;
+
+  @Override
+  protected void testProgram() throws Exception {
+
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+
+    final PCollectionView<String> sidesInput = p
+        .apply(Create.of(expected))
+        .apply(View.<String>asSingleton());
+
+    p.apply(Create.of("bli"))
+        .apply(ParDo.of(new DoFn<String, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            String s = c.sideInput(sidesInput);
+            c.output(s);
+          }
+        }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(expected, resultPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
new file mode 100644
index 0000000..ceb0a3f
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.examples.TFIDF;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Keys;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.net.URI;
+
+
+public class TfIdfITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public TfIdfITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "a", "m", "n", "b", "c", "d"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline pipeline = FlinkTestPipeline.createForBatch();
+
+    pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
+
+    PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
+        .apply(Create.of(
+            KV.of(new URI("x"), "a b c d"),
+            KV.of(new URI("y"), "a b c"),
+            KV.of(new URI("z"), "a m n")))
+        .apply(new TFIDF.ComputeTfIdf());
+
+    PCollection<String> words = wordToUriAndTfIdf
+        .apply(Keys.<String>create())
+        .apply(RemoveDuplicates.<String>create());
+
+    words.apply(TextIO.Write.to(resultPath));
+
+    pipeline.run();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
new file mode 100644
index 0000000..c2b6fdd
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.examples.WordCount;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.MapElements;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+public class WordCountITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public WordCountITCase(){
+  }
+
+  static final String[] WORDS_ARRAY = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+
+  static final String[] COUNTS_ARRAY = new String[] {
+      "hi: 5", "there: 1", "sue: 2", "bob: 2"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> input = 
p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+
+    input
+        .apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+        .apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
new file mode 100644
index 0000000..d78434b
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+
+public class WordCountJoin2ITCase extends JavaProgramTestBase {
+
+  static final String[] WORDS_1 = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+      "hi tim", "beauty", "hooray sue bob",
+      "hi there", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+      "beauty -> Tag1: Tag2: 1",
+      "bob -> Tag1: 2 Tag2: 1",
+      "hi -> Tag1: 5 Tag2: 3",
+      "hooray -> Tag1: Tag2: 1",
+      "please -> Tag1: Tag2: 1",
+      "say -> Tag1: Tag2: 1",
+      "sue -> Tag1: 2 Tag2: 1",
+      "there -> Tag1: 1 Tag2: 1",
+      "tim -> Tag1: Tag2: 1"
+  };
+
+  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    /* Create two PCollections and join them */
+    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    /* CoGroup the two collections */
+    PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
+        .of(tag1, occurences1)
+        .and(tag2, occurences2)
+        .apply(CoGroupByKey.<String>create());
+
+    /* Format output */
+    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+        .apply(TextIO.Write.named("test").to(resultPath));
+
+    p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+
+    @Override
+    public void startBundle(Context c) {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      CoGbkResult value = c.element().getValue();
+      String key = c.element().getKey();
+      String countTag1 = tag1.getId() + ": ";
+      String countTag2 = tag2.getId() + ": ";
+      for (Long count : value.getAll(tag1)) {
+        countTag1 += count + " ";
+      }
+      for (Long count : value.getAll(tag2)) {
+        countTag2 += count;
+      }
+      c.output(key + " -> " + countTag1 + countTag2);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
new file mode 100644
index 0000000..0836279
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
+import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import com.google.common.base.Joiner;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+
+public class WordCountJoin3ITCase extends JavaProgramTestBase {
+
+  static final String[] WORDS_1 = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+      "hi tim", "beauty", "hooray sue bob",
+      "hi there", "", "please say hi"};
+
+  static final String[] WORDS_3 = new String[] {
+      "hi stephan", "beauty", "hooray big fabian",
+      "hi yo", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+      "beauty -> Tag1: Tag2: 1 Tag3: 1",
+      "bob -> Tag1: 2 Tag2: 1 Tag3: ",
+      "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
+      "hooray -> Tag1: Tag2: 1 Tag3: 1",
+      "please -> Tag1: Tag2: 1 Tag3: 1",
+      "say -> Tag1: Tag2: 1 Tag3: 1",
+      "sue -> Tag1: 2 Tag2: 1 Tag3: ",
+      "there -> Tag1: 1 Tag2: 1 Tag3: ",
+      "tim -> Tag1: Tag2: 1 Tag3: ",
+      "stephan -> Tag1: Tag2: Tag3: 1",
+      "yo -> Tag1: Tag2: Tag3: 1",
+      "fabian -> Tag1: Tag2: Tag3: 1",
+      "big -> Tag1: Tag2: Tag3: 1"
+  };
+
+  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+  static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    /* Create two PCollections and join them */
+    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    /* CoGroup the two collections */
+    PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
+        .of(tag1, occurences1)
+        .and(tag2, occurences2)
+        .and(tag3, occurences3)
+        .apply(CoGroupByKey.<String>create());
+
+    /* Format output */
+    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+        .apply(TextIO.Write.named("test").to(resultPath));
+
+    p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+
+    @Override
+    public void startBundle(Context c) {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      CoGbkResult value = c.element().getValue();
+      String key = c.element().getKey();
+      String countTag1 = tag1.getId() + ": ";
+      String countTag2 = tag2.getId() + ": ";
+      String countTag3 = tag3.getId() + ": ";
+      for (Long count : value.getAll(tag1)) {
+        countTag1 += count + " ";
+      }
+      for (Long count : value.getAll(tag2)) {
+        countTag2 += count + " ";
+      }
+      for (Long count : value.getAll(tag3)) {
+        countTag3 += count;
+      }
+      c.output(key + " -> " + countTag1 + countTag2 + countTag3);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
new file mode 100644
index 0000000..497a5bb
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.Sink;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Write;
+import com.google.common.base.Joiner;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.URI;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests the translation of custom Write.Bound sinks.
+ */
+public class WriteSinkITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public WriteSinkITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "Joe red 3", "Mary blue 4", "Max yellow 23"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
+      .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
+
+    p.run();
+  }
+
+  /**
+   * Simple custom sink which writes to a file.
+   */
+  private static class MyCustomSink extends Sink<String> {
+
+    private final String resultPath;
+
+    public MyCustomSink(String resultPath) {
+      this.resultPath = resultPath;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      assertNotNull(options);
+    }
+
+    @Override
+    public WriteOperation<String, ?> createWriteOperation(PipelineOptions 
options) {
+      return new MyWriteOperation();
+    }
+
+    private class MyWriteOperation extends WriteOperation<String, String> {
+
+      @Override
+      public Coder<String> getWriterResultCoder() {
+        return StringUtf8Coder.of();
+      }
+
+      @Override
+      public void initialize(PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public void finalize(Iterable<String> writerResults, PipelineOptions 
options) throws Exception {
+
+      }
+
+      @Override
+      public Writer<String, String> createWriter(PipelineOptions options) 
throws Exception {
+        return new MyWriter();
+      }
+
+      @Override
+      public Sink<String> getSink() {
+        return MyCustomSink.this;
+      }
+
+      /**
+       * Simple Writer which writes to a file.
+       */
+      private class MyWriter extends Writer<String, String> {
+
+        private PrintWriter internalWriter;
+
+        @Override
+        public void open(String uId) throws Exception {
+          Path path = new Path(resultPath + "/" + uId);
+          FileSystem.get(new URI("file:///")).create(path, false);
+          internalWriter = new PrintWriter(new File(path.toUri()));
+        }
+
+        @Override
+        public void write(String value) throws Exception {
+          internalWriter.println(value);
+        }
+
+        @Override
+        public String close() throws Exception {
+          internalWriter.close();
+          return resultPath;
+        }
+
+        @Override
+        public WriteOperation<String, String> getWriteOperation() {
+          return MyWriteOperation.this;
+        }
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
new file mode 100644
index 0000000..27ddc83
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -0,0 +1,506 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import org.apache.beam.runners.flink.FlinkTestPipeline;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.common.base.Throwables;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GroupAlsoByWindowTest {
+
+  private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
+
+  private final WindowingStrategy 
slidingWindowWithAfterWatermarkTriggerStrategy =
+      
WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+          
.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+  private final WindowingStrategy sessionWindowingStrategy =
+      
WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
+          .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+          
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+          .withAllowedLateness(Duration.standardSeconds(100));
+
+  private final WindowingStrategy fixedWindowingStrategy =
+      WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+
+  private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
+      fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
+
+  private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy 
=
+      fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
+
+  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
+    fixedWindowingStrategy.withTrigger(
+      
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
+        .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
+
+  /**
+   * The default accumulation mode is
+   * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
+   * This strategy changes it to
+   * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
+   */
+  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
+      fixedWindowWithCompoundTriggerStrategy
+          
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+  @Test
+  public void testWithLateness() throws Exception {
+    WindowingStrategy strategy = 
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
+        .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(Duration.millis(1000));
+    long initialTime = 0L;
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 2000));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 4000));
+
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 4),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 2000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 5),
+            new Instant(initialTime + 1999),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
+        , initialTime + 1999));
+
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1999),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
+        , initialTime + 1999));
+    expectedOutput.add(new Watermark(initialTime + 4000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testSessionWindows() throws Exception {
+    WindowingStrategy strategy = sessionWindowingStrategy;
+
+    long initialTime = 0L;
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(1), new Instant(5700)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 6000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 11),
+            new Instant(initialTime + 6700),
+            new IntervalWindow(new Instant(1), new Instant(10900)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 6700));
+    expectedOutput.add(new Watermark(initialTime + 12000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testSlidingWindows() throws Exception {
+    WindowingStrategy strategy = 
slidingWindowWithAfterWatermarkTriggerStrategy;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+    testHarness.processWatermark(new Watermark(initialTime + 25000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 5000),
+            new IntervalWindow(new Instant(0), new Instant(10000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 5000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(-5000), new Instant(5000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 11),
+            new Instant(initialTime + 15000),
+            new IntervalWindow(new Instant(10000), new Instant(20000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 15000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 3),
+            new Instant(initialTime + 10000),
+            new IntervalWindow(new Instant(5000), new Instant(15000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key2", 1),
+            new Instant(initialTime + 19500),
+            new IntervalWindow(new Instant(10000), new Instant(20000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key2", 1),
+            new Instant(initialTime + 20000),
+            /**
+             * this is 20000 and not 19500 because of a convention in dataflow 
where
+             * timestamps of windowed values in a window cannot be smaller 
than the
+             * end of a previous window. Checkout the documentation of the
+             * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
+             */
+            new IntervalWindow(new Instant(15000), new Instant(25000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 20000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 8),
+            new Instant(initialTime + 20000),
+            new IntervalWindow(new Instant(15000), new Instant(25000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 20000));
+    expectedOutput.add(new Watermark(initialTime + 25000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testAfterWatermarkProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 6),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME)), initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 11),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testAfterCountProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
+
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, 
PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testCompoundProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
+
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    /**
+     * PaneInfo are:
+     *     isFirst (pane in window),
+     *     isLast, Timing (of triggering),
+     *     index (of pane in the window),
+     *     onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
+     * */
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, 
PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1),
+        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testCompoundAccumulatingPanesProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, 
PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 10),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 6),
+        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 11),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  private OneInputStreamOperatorTestHarness 
createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) 
throws Exception {
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processWatermark(new Watermark(initialTime + 10000));
+    testHarness.processWatermark(new Watermark(initialTime + 20000));
+
+    return testHarness;
+  }
+
+  private static class ResultSortComparator implements Comparator<Object> {
+    @Override
+    public int compare(Object o1, Object o2) {
+      if (o1 instanceof Watermark && o2 instanceof Watermark) {
+        Watermark w1 = (Watermark) o1;
+        Watermark w2 = (Watermark) o2;
+        return (int) (w1.getTimestamp() - w2.getTimestamp());
+      } else {
+        StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = 
(StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
+        StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = 
(StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
+
+        int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - 
sr1.getValue().getTimestamp().getMillis());
+        if (comparison != 0) {
+          return comparison;
+        }
+
+        comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+        if(comparison == 0) {
+          comparison = Integer.compare(
+              sr0.getValue().getValue().getValue(),
+              sr1.getValue().getValue().getValue());
+        }
+        if(comparison == 0) {
+          Collection windowsA = sr0.getValue().getWindows();
+          Collection windowsB = sr1.getValue().getWindows();
+
+          if(windowsA.size() != 1 || windowsB.size() != 1) {
+            throw new IllegalStateException("A value cannot belong to more 
than one windows after grouping.");
+          }
+
+          BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
+          BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
+          comparison = Long.compare(windowA.maxTimestamp().getMillis(), 
windowB.maxTimestamp().getMillis());
+        }
+        return comparison;
+      }
+    }
+  }
+
+  private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
+                           T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
+    final Instant inputTimestamp = timestamp;
+    final WindowFn windowFn = strategy.getWindowFn();
+
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    if (windows == null) {
+      try {
+        windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public Object element() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input element when none was 
available");
+          }
+
+          @Override
+          public Instant timestamp() {
+            if (inputTimestamp == null) {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input timestamp when none was 
available");
+            }
+            return inputTimestamp;
+          }
+
+          @Override
+          public Collection<? extends BoundedWindow> windows() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input windows when none were 
available");
+          }
+        });
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    return WindowedValue.of(output, timestamp, windows, pane);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
new file mode 100644
index 0000000..80d78b9
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import org.apache.beam.runners.flink.FlinkTestPipeline;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.TextIO;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.base.Joiner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class GroupByNullKeyTest extends StreamingProgramTestBase implements 
Serializable {
+
+
+  protected String resultPath;
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
+  };
+
+  public GroupByNullKeyTest(){
+  }
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, 
String>, String> {
+    private static final long serialVersionUID = 0;
+
+    @Override
+    public void processElement(ProcessContext c) {
+      KV<Integer, String> record = c.element();
+      long now = System.currentTimeMillis();
+      int timestamp = record.getKey();
+      String userName = record.getValue();
+      if (userName != null) {
+        // Sets the implicit timestamp field to be used in windowing.
+        c.outputWithTimestamp(userName, new Instant(timestamp + now));
+      }
+    }
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForStreaming();
+
+    PCollection<String> output =
+      p.apply(Create.of(Arrays.asList(
+          KV.<Integer, String>of(0, "user1"),
+          KV.<Integer, String>of(1, "user1"),
+          KV.<Integer, String>of(2, "user1"),
+          KV.<Integer, String>of(10, "user2"),
+          KV.<Integer, String>of(1, "user2"),
+          KV.<Integer, String>of(15000, "user2"),
+          KV.<Integer, String>of(12000, "user2"),
+          KV.<Integer, String>of(25000, "user3"))))
+          .apply(ParDo.of(new ExtractUserAndTimestamp()))
+          
.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+              .triggering(AfterWatermark.pastEndOfWindow())
+              .withAllowedLateness(Duration.ZERO)
+              .discardingFiredPanes())
+
+          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              String elem = c.element();
+              c.output(KV.<Void, String>of((Void) null, elem));
+            }
+          }))
+          .apply(GroupByKey.<Void, String>create())
+          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              KV<Void, Iterable<String>> elem = c.element();
+              StringBuilder str = new StringBuilder();
+              str.append("k: " + elem.getKey() + " v:");
+              for (String v : elem.getValue()) {
+                str.append(" " + v);
+              }
+              c.output(str.toString());
+            }
+          }));
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+}

Reply via email to