Repository: incubator-beam
Updated Branches:
  refs/heads/master c0b9fc660 -> 071e4dd67


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
deleted file mode 100644
index e73c456..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-
-public class WordCountJoin2ITCase extends JavaProgramTestBase {
-
-  static final String[] WORDS_1 = new String[] {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-
-  static final String[] WORDS_2 = new String[] {
-      "hi tim", "beauty", "hooray sue bob",
-      "hi there", "", "please say hi"};
-
-  static final String[] RESULTS = new String[] {
-      "beauty -> Tag1: Tag2: 1",
-      "bob -> Tag1: 2 Tag2: 1",
-      "hi -> Tag1: 5 Tag2: 3",
-      "hooray -> Tag1: Tag2: 1",
-      "please -> Tag1: Tag2: 1",
-      "say -> Tag1: Tag2: 1",
-      "sue -> Tag1: 2 Tag2: 1",
-      "there -> Tag1: 1 Tag2: 1",
-      "tim -> Tag1: Tag2: 1"
-  };
-
-  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
-  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-
-  protected String resultPath;
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    /* Create two PCollections and join them */
-    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement());
-
-    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement());
-
-    /* CoGroup the two collections */
-    PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
-        .of(tag1, occurences1)
-        .and(tag2, occurences2)
-        .apply(CoGroupByKey.<String>create());
-
-    /* Format output */
-    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
-        .apply(TextIO.Write.named("test").to(resultPath));
-
-    p.run();
-  }
-
-
-  static class ExtractWordsFn extends DoFn<String, String> {
-
-    @Override
-    public void startBundle(Context c) {
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      CoGbkResult value = c.element().getValue();
-      String key = c.element().getKey();
-      String countTag1 = tag1.getId() + ": ";
-      String countTag2 = tag2.getId() + ": ";
-      for (Long count : value.getAll(tag1)) {
-        countTag1 += count + " ";
-      }
-      for (Long count : value.getAll(tag2)) {
-        countTag2 += count;
-      }
-      c.output(key + " -> " + countTag1 + countTag2);
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
deleted file mode 100644
index 6b57d77..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-import com.google.common.base.Joiner;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-
-public class WordCountJoin3ITCase extends JavaProgramTestBase {
-
-  static final String[] WORDS_1 = new String[] {
-      "hi there", "hi", "hi sue bob",
-      "hi sue", "", "bob hi"};
-
-  static final String[] WORDS_2 = new String[] {
-      "hi tim", "beauty", "hooray sue bob",
-      "hi there", "", "please say hi"};
-
-  static final String[] WORDS_3 = new String[] {
-      "hi stephan", "beauty", "hooray big fabian",
-      "hi yo", "", "please say hi"};
-
-  static final String[] RESULTS = new String[] {
-      "beauty -> Tag1: Tag2: 1 Tag3: 1",
-      "bob -> Tag1: 2 Tag2: 1 Tag3: ",
-      "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
-      "hooray -> Tag1: Tag2: 1 Tag3: 1",
-      "please -> Tag1: Tag2: 1 Tag3: 1",
-      "say -> Tag1: Tag2: 1 Tag3: 1",
-      "sue -> Tag1: 2 Tag2: 1 Tag3: ",
-      "there -> Tag1: 1 Tag2: 1 Tag3: ",
-      "tim -> Tag1: Tag2: 1 Tag3: ",
-      "stephan -> Tag1: Tag2: Tag3: 1",
-      "yo -> Tag1: Tag2: Tag3: 1",
-      "fabian -> Tag1: Tag2: Tag3: 1",
-      "big -> Tag1: Tag2: Tag3: 1"
-  };
-
-  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
-  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-  static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
-
-  protected String resultPath;
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    /* Create two PCollections and join them */
-    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement());
-
-    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement());
-
-    PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement());
-
-    /* CoGroup the two collections */
-    PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
-        .of(tag1, occurences1)
-        .and(tag2, occurences2)
-        .and(tag3, occurences3)
-        .apply(CoGroupByKey.<String>create());
-
-    /* Format output */
-    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
-        .apply(TextIO.Write.named("test").to(resultPath));
-
-    p.run();
-  }
-
-
-  static class ExtractWordsFn extends DoFn<String, String> {
-
-    @Override
-    public void startBundle(Context c) {
-    }
-
-    @Override
-    public void processElement(ProcessContext c) {
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
-    @Override
-    public void processElement(ProcessContext c) {
-      CoGbkResult value = c.element().getValue();
-      String key = c.element().getKey();
-      String countTag1 = tag1.getId() + ": ";
-      String countTag2 = tag2.getId() + ": ";
-      String countTag3 = tag3.getId() + ": ";
-      for (Long count : value.getAll(tag1)) {
-        countTag1 += count + " ";
-      }
-      for (Long count : value.getAll(tag2)) {
-        countTag2 += count + " ";
-      }
-      for (Long count : value.getAll(tag3)) {
-        countTag3 += count;
-      }
-      c.output(key + " -> " + countTag1 + countTag2 + countTag3);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
deleted file mode 100644
index dfa15ce..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.Sink;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.Write;
-import com.google.common.base.Joiner;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.File;
-import java.io.PrintWriter;
-import java.net.URI;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests the translation of custom Write.Bound sinks.
- */
-public class WriteSinkITCase extends JavaProgramTestBase {
-
-  protected String resultPath;
-
-  public WriteSinkITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "Joe red 3", "Mary blue 4", "Max yellow 23"};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-    runProgram(resultPath);
-  }
-
-  private static void runProgram(String resultPath) {
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
-      .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
-
-    p.run();
-  }
-
-  /**
-   * Simple custom sink which writes to a file.
-   */
-  private static class MyCustomSink extends Sink<String> {
-
-    private final String resultPath;
-
-    public MyCustomSink(String resultPath) {
-      this.resultPath = resultPath;
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      assertNotNull(options);
-    }
-
-    @Override
-    public WriteOperation<String, ?> createWriteOperation(PipelineOptions 
options) {
-      return new MyWriteOperation();
-    }
-
-    private class MyWriteOperation extends WriteOperation<String, String> {
-
-      @Override
-      public Coder<String> getWriterResultCoder() {
-        return StringUtf8Coder.of();
-      }
-
-      @Override
-      public void initialize(PipelineOptions options) throws Exception {
-
-      }
-
-      @Override
-      public void finalize(Iterable<String> writerResults, PipelineOptions 
options) throws Exception {
-
-      }
-
-      @Override
-      public Writer<String, String> createWriter(PipelineOptions options) 
throws Exception {
-        return new MyWriter();
-      }
-
-      @Override
-      public Sink<String> getSink() {
-        return MyCustomSink.this;
-      }
-
-      /**
-       * Simple Writer which writes to a file.
-       */
-      private class MyWriter extends Writer<String, String> {
-
-        private PrintWriter internalWriter;
-
-        @Override
-        public void open(String uId) throws Exception {
-          Path path = new Path(resultPath + "/" + uId);
-          FileSystem.get(new URI("file:///")).create(path, false);
-          internalWriter = new PrintWriter(new File(path.toUri()));
-        }
-
-        @Override
-        public void write(String value) throws Exception {
-          internalWriter.println(value);
-        }
-
-        @Override
-        public String close() throws Exception {
-          internalWriter.close();
-          return resultPath;
-        }
-
-        @Override
-        public WriteOperation<String, String> getWriteOperation() {
-          return MyWriteOperation.this;
-        }
-      }
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
deleted file mode 100644
index 880da59..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.windowing.*;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.Throwables;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class GroupAlsoByWindowTest {
-
-  private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
-
-  private final WindowingStrategy 
slidingWindowWithAfterWatermarkTriggerStrategy =
-      
WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
-          
.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-  private final WindowingStrategy sessionWindowingStrategy =
-      
WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
-          .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-          
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
-          .withAllowedLateness(Duration.standardSeconds(100));
-
-  private final WindowingStrategy fixedWindowingStrategy =
-      WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
-
-  private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
-      fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
-
-  private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy 
=
-      fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
-
-  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
-    fixedWindowingStrategy.withTrigger(
-      
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
-        .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
-
-  /**
-   * The default accumulation mode is
-   * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
-   * This strategy changes it to
-   * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
-   */
-  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
-      fixedWindowWithCompoundTriggerStrategy
-          
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-  @Test
-  public void testWithLateness() throws Exception {
-    WindowingStrategy strategy = 
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
-        .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
-        .withAllowedLateness(Duration.millis(1000));
-    long initialTime = 0L;
-    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
-
-    FlinkGroupAlsoByWindowWrapper gbwOperaror =
-        FlinkGroupAlsoByWindowWrapper.createForTesting(
-            pipeline.getOptions(),
-            pipeline.getCoderRegistry(),
-            strategy,
-            inputCoder,
-            combiner.<String>asKeyedFn());
-
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
-    testHarness.open();
-
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processWatermark(new Watermark(initialTime + 2000));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processWatermark(new Watermark(initialTime + 4000));
-
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 4),
-            new Instant(initialTime + 1),
-            new IntervalWindow(new Instant(0), new Instant(2000)),
-            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-        , initialTime + 1));
-    expectedOutput.add(new Watermark(initialTime + 2000));
-
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 5),
-            new Instant(initialTime + 1999),
-            new IntervalWindow(new Instant(0), new Instant(2000)),
-            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
-        , initialTime + 1999));
-
-
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 6),
-            new Instant(initialTime + 1999),
-            new IntervalWindow(new Instant(0), new Instant(2000)),
-            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
-        , initialTime + 1999));
-    expectedOutput.add(new Watermark(initialTime + 4000));
-
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-    testHarness.close();
-  }
-
-  @Test
-  public void testSessionWindows() throws Exception {
-    WindowingStrategy strategy = sessionWindowingStrategy;
-
-    long initialTime = 0L;
-    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
-
-    FlinkGroupAlsoByWindowWrapper gbwOperaror =
-        FlinkGroupAlsoByWindowWrapper.createForTesting(
-            pipeline.getOptions(),
-            pipeline.getCoderRegistry(),
-            strategy,
-            inputCoder,
-            combiner.<String>asKeyedFn());
-
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
-    testHarness.open();
-
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processWatermark(new Watermark(initialTime + 6000));
-
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-
-    testHarness.processWatermark(new Watermark(initialTime + 12000));
-
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 6),
-            new Instant(initialTime + 1),
-            new IntervalWindow(new Instant(1), new Instant(5700)),
-            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-        , initialTime + 1));
-    expectedOutput.add(new Watermark(initialTime + 6000));
-
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 11),
-            new Instant(initialTime + 6700),
-            new IntervalWindow(new Instant(1), new Instant(10900)),
-            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-        , initialTime + 6700));
-    expectedOutput.add(new Watermark(initialTime + 12000));
-
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-    testHarness.close();
-  }
-
-  @Test
-  public void testSlidingWindows() throws Exception {
-    WindowingStrategy strategy = 
slidingWindowWithAfterWatermarkTriggerStrategy;
-    long initialTime = 0L;
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        createTestingOperatorAndState(strategy, initialTime);
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-    testHarness.processWatermark(new Watermark(initialTime + 25000));
-
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 6),
-            new Instant(initialTime + 5000),
-            new IntervalWindow(new Instant(0), new Instant(10000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 5000));
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 6),
-            new Instant(initialTime + 1),
-            new IntervalWindow(new Instant(-5000), new Instant(5000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 1));
-    expectedOutput.add(new Watermark(initialTime + 10000));
-
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 11),
-            new Instant(initialTime + 15000),
-            new IntervalWindow(new Instant(10000), new Instant(20000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 15000));
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 3),
-            new Instant(initialTime + 10000),
-            new IntervalWindow(new Instant(5000), new Instant(15000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 10000));
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key2", 1),
-            new Instant(initialTime + 19500),
-            new IntervalWindow(new Instant(10000), new Instant(20000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 19500));
-    expectedOutput.add(new Watermark(initialTime + 20000));
-
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key2", 1),
-            new Instant(initialTime + 20000),
-            /**
-             * this is 20000 and not 19500 because of a convention in dataflow 
where
-             * timestamps of windowed values in a window cannot be smaller 
than the
-             * end of a previous window. Checkout the documentation of the
-             * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
-             */
-            new IntervalWindow(new Instant(15000), new Instant(25000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 20000));
-    expectedOutput.add(new StreamRecord<>(
-        WindowedValue.of(KV.of("key1", 8),
-            new Instant(initialTime + 20000),
-            new IntervalWindow(new Instant(15000), new Instant(25000)),
-            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-        , initialTime + 20000));
-    expectedOutput.add(new Watermark(initialTime + 25000));
-
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-    testHarness.close();
-  }
-
-  @Test
-  public void testAfterWatermarkProgram() throws Exception {
-    WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
-    long initialTime = 0L;
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        createTestingOperatorAndState(strategy, initialTime);
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 6),
-        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME)), initialTime + 1));
-    expectedOutput.add(new Watermark(initialTime + 10000));
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 11),
-        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-    expectedOutput.add(new Watermark(initialTime + 20000));
-
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-    testHarness.close();
-  }
-
-  @Test
-  public void testAfterCountProgram() throws Exception {
-    WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
-
-    long initialTime = 0L;
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        createTestingOperatorAndState(strategy, initialTime);
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, 
PaneInfo.Timing.EARLY)), initialTime + 1));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.EARLY)), initialTime + 10000));
-    expectedOutput.add(new Watermark(initialTime + 10000));
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
-    expectedOutput.add(new Watermark(initialTime + 20000));
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-    testHarness.close();
-  }
-
-  @Test
-  public void testCompoundProgram() throws Exception {
-    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
-
-    long initialTime = 0L;
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        createTestingOperatorAndState(strategy, initialTime);
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-    /**
-     * PaneInfo are:
-     *     isFirst (pane in window),
-     *     isLast, Timing (of triggering),
-     *     index (of pane in the window),
-     *     onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
-     * */
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, 
PaneInfo.Timing.EARLY)), initialTime + 1));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
false, PaneInfo.Timing.EARLY)), initialTime + 10000));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1),
-        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
-
-    expectedOutput.add(new Watermark(initialTime + 10000));
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-
-    expectedOutput.add(new Watermark(initialTime + 20000));
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-    testHarness.close();
-  }
-
-  @Test
-  public void testCompoundAccumulatingPanesProgram() throws Exception {
-    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
-    long initialTime = 0L;
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        createTestingOperatorAndState(strategy, initialTime);
-    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, 
PaneInfo.Timing.EARLY)), initialTime + 1));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
-        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
false, PaneInfo.Timing.EARLY)), initialTime + 10000));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 10),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 6),
-        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
-
-    expectedOutput.add(new Watermark(initialTime + 10000));
-
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 11),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
-    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
-        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
-
-    expectedOutput.add(new Watermark(initialTime + 20000));
-    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-    testHarness.close();
-  }
-
-  private OneInputStreamOperatorTestHarness 
createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) 
throws Exception {
-    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
-
-    FlinkGroupAlsoByWindowWrapper gbwOperaror =
-        FlinkGroupAlsoByWindowWrapper.createForTesting(
-            pipeline.getOptions(),
-            pipeline.getCoderRegistry(),
-            strategy,
-            inputCoder,
-            combiner.<String>asKeyedFn());
-
-    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
-        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
-    testHarness.open();
-
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-
-    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
-
-    testHarness.processWatermark(new Watermark(initialTime + 10000));
-    testHarness.processWatermark(new Watermark(initialTime + 20000));
-
-    return testHarness;
-  }
-
-  private static class ResultSortComparator implements Comparator<Object> {
-    @Override
-    public int compare(Object o1, Object o2) {
-      if (o1 instanceof Watermark && o2 instanceof Watermark) {
-        Watermark w1 = (Watermark) o1;
-        Watermark w2 = (Watermark) o2;
-        return (int) (w1.getTimestamp() - w2.getTimestamp());
-      } else {
-        StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = 
(StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
-        StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = 
(StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
-
-        int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - 
sr1.getValue().getTimestamp().getMillis());
-        if (comparison != 0) {
-          return comparison;
-        }
-
-        comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
-        if(comparison == 0) {
-          comparison = Integer.compare(
-              sr0.getValue().getValue().getValue(),
-              sr1.getValue().getValue().getValue());
-        }
-        if(comparison == 0) {
-          Collection windowsA = sr0.getValue().getWindows();
-          Collection windowsB = sr1.getValue().getWindows();
-
-          if(windowsA.size() != 1 || windowsB.size() != 1) {
-            throw new IllegalStateException("A value cannot belong to more 
than one windows after grouping.");
-          }
-
-          BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
-          BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
-          comparison = Long.compare(windowA.maxTimestamp().getMillis(), 
windowB.maxTimestamp().getMillis());
-        }
-        return comparison;
-      }
-    }
-  }
-
-  private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
-                           T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
-    final Instant inputTimestamp = timestamp;
-    final WindowFn windowFn = strategy.getWindowFn();
-
-    if (timestamp == null) {
-      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-    }
-
-    if (windows == null) {
-      try {
-        windows = windowFn.assignWindows(windowFn.new AssignContext() {
-          @Override
-          public Object element() {
-            throw new UnsupportedOperationException(
-                "WindowFn attempted to access input element when none was 
available");
-          }
-
-          @Override
-          public Instant timestamp() {
-            if (inputTimestamp == null) {
-              throw new UnsupportedOperationException(
-                  "WindowFn attempted to access input timestamp when none was 
available");
-            }
-            return inputTimestamp;
-          }
-
-          @Override
-          public Collection<? extends BoundedWindow> windows() {
-            throw new UnsupportedOperationException(
-                "WindowFn attempted to access input windows when none were 
available");
-          }
-        });
-      } catch (Exception e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-
-    return WindowedValue.of(output, timestamp, windows, pane);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
deleted file mode 100644
index 63e0bcf..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-public class GroupByNullKeyTest extends StreamingProgramTestBase implements 
Serializable {
-
-
-  protected String resultPath;
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
-  };
-
-  public GroupByNullKeyTest(){
-  }
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-  }
-
-  public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, 
String>, String> {
-    private static final long serialVersionUID = 0;
-
-    @Override
-    public void processElement(ProcessContext c) {
-      KV<Integer, String> record = c.element();
-      long now = System.currentTimeMillis();
-      int timestamp = record.getKey();
-      String userName = record.getValue();
-      if (userName != null) {
-        // Sets the implicit timestamp field to be used in windowing.
-        c.outputWithTimestamp(userName, new Instant(timestamp + now));
-      }
-    }
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    Pipeline p = FlinkTestPipeline.createForStreaming();
-
-    PCollection<String> output =
-      p.apply(Create.of(Arrays.asList(
-          KV.<Integer, String>of(0, "user1"),
-          KV.<Integer, String>of(1, "user1"),
-          KV.<Integer, String>of(2, "user1"),
-          KV.<Integer, String>of(10, "user2"),
-          KV.<Integer, String>of(1, "user2"),
-          KV.<Integer, String>of(15000, "user2"),
-          KV.<Integer, String>of(12000, "user2"),
-          KV.<Integer, String>of(25000, "user3"))))
-          .apply(ParDo.of(new ExtractUserAndTimestamp()))
-          
.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
-              .triggering(AfterWatermark.pastEndOfWindow())
-              .withAllowedLateness(Duration.ZERO)
-              .discardingFiredPanes())
-
-          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              String elem = c.element();
-              c.output(KV.<Void, String>of((Void) null, elem));
-            }
-          }))
-          .apply(GroupByKey.<Void, String>create())
-          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              KV<Void, Iterable<String>> elem = c.element();
-              StringBuilder str = new StringBuilder();
-              str.append("k: " + elem.getKey() + " v:");
-              for (String v : elem.getValue()) {
-                str.append(" " + v);
-              }
-              c.output(str.toString());
-            }
-          }));
-    output.apply(TextIO.Write.to(resultPath));
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
deleted file mode 100644
index 77a8de6..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils;
-import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter;
-import com.google.cloud.dataflow.sdk.coders.*;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.state.*;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.joda.time.Instant;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import static org.junit.Assert.assertEquals;
-
-public class StateSerializationTest {
-
-  private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
-  private static final String KEY_PREFIX = "TEST_";
-
-  // TODO: This can be replaced with the standard Sum.SumIntererFn once the 
state no longer needs
-  // to create a StateTag at the point of restoring state. Currently StateTags 
are compared strictly
-  // by type and combiners always use KeyedCombineFnWithContext rather than 
KeyedCombineFn or CombineFn.
-  private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, 
int[], Integer> SUM_COMBINER =
-    new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], 
Integer>() {
-      @Override
-      public int[] createAccumulator(Object key, CombineWithContext.Context c) 
{
-        return new int[1];
-      }
-
-      @Override
-      public int[] addInput(Object key, int[] accumulator, Integer value, 
CombineWithContext.Context c) {
-        accumulator[0] += value;
-        return accumulator;
-      }
-
-      @Override
-      public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, 
CombineWithContext.Context c) {
-        int[] r = new int[1];
-        for (int[] a : accumulators) {
-          r[0] += a[0];
-        }
-        return r;
-      }
-
-      @Override
-      public Integer extractOutput(Object key, int[] accumulator, 
CombineWithContext.Context c) {
-        return accumulator[0];
-      }
-    };
-
-  private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
-    VarIntCoder.of(),
-    new DelegateCoder.CodingFunction<int[], Integer>() {
-      @Override
-      public Integer apply(int[] accumulator) {
-        return accumulator[0];
-      }
-    },
-    new DelegateCoder.CodingFunction<Integer, int[]>() {
-      @Override
-      public int[] apply(Integer value) {
-        int[] a = new int[1];
-        a[0] = value;
-        return a;
-      }
-    });
-
-  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
-    StateTags.value("stringValue", StringUtf8Coder.of());
-  private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
-    StateTags.value("stringValue", VarIntCoder.of());
-  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>> SUM_INTEGER_ADDR =
-    StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, 
SUM_COMBINER);
-  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
-    StateTags.bag("stringBag", StringUtf8Coder.of());
-  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> 
WATERMARK_BAG_ADDR =
-    StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEarliestInputTimestamp());
-
-  private Map<String, FlinkStateInternals<String>> statePerKey = new 
HashMap<>();
-
-  private Map<String, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
-
-  private void initializeStateAndTimers() throws CannotProvideCoderException {
-    for (int i = 0; i < 10; i++) {
-      String key = KEY_PREFIX + i;
-
-      FlinkStateInternals state = initializeStateForKey(key);
-      Set<TimerInternals.TimerData> timers = new HashSet<>();
-      for (int j = 0; j < 5; j++) {
-        TimerInternals.TimerData timer = TimerInternals
-          .TimerData.of(NAMESPACE_1,
-            new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
-        timers.add(timer);
-      }
-
-      statePerKey.put(key, state);
-      activeTimers.put(key, timers);
-    }
-  }
-
-  private FlinkStateInternals<String> initializeStateForKey(String key) throws 
CannotProvideCoderException {
-    FlinkStateInternals<String> state = createState(key);
-
-    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    value.write("test");
-
-    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
-    value2.write(4);
-    value2.write(5);
-
-    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = 
state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    combiningValue.add(1);
-    combiningValue.add(2);
-
-    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
-    watermark.add(new Instant(1000));
-
-    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
-    bag.add("v1");
-    bag.add("v2");
-    bag.add("v3");
-    bag.add("v4");
-    return state;
-  }
-
-  private boolean restoreAndTestState(DataInputView in) throws Exception {
-    StateCheckpointReader reader = new StateCheckpointReader(in);
-    final ClassLoader userClassloader = this.getClass().getClassLoader();
-    Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
-    Coder<String> keyCoder = StringUtf8Coder.of();
-
-    boolean comparisonRes = true;
-
-    for (String key : statePerKey.keySet()) {
-      comparisonRes &= checkStateForKey(key);
-    }
-
-    // restore the timers
-    Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = 
StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
-    if (activeTimers.size() != restoredTimersPerKey.size()) {
-      return false;
-    }
-
-    for (String key : statePerKey.keySet()) {
-      Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
-      Set<TimerInternals.TimerData> restoredTimers = 
restoredTimersPerKey.get(key);
-      comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
-    }
-
-    // restore the state
-    Map<String, FlinkStateInternals<String>> restoredPerKeyState =
-      StateCheckpointUtils.decodeState(reader, 
OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, 
userClassloader);
-    if (restoredPerKeyState.size() != statePerKey.size()) {
-      return false;
-    }
-
-    for (String key : statePerKey.keySet()) {
-      FlinkStateInternals<String> originalState = statePerKey.get(key);
-      FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
-      comparisonRes &= checkStateForKey(originalState, restoredState);
-    }
-    return comparisonRes;
-  }
-
-  private boolean checkStateForKey(String key) throws 
CannotProvideCoderException {
-    FlinkStateInternals<String> state = statePerKey.get(key);
-
-    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
-    boolean comp = value.read().equals("test");
-
-    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
-    comp &= value2.read().equals(5);
-
-    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = 
state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    comp &= combiningValue.read().equals(3);
-
-    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
-    comp &= watermark.read().equals(new Instant(1000));
-
-    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
-    Iterator<String> it = bag.read().iterator();
-    int i = 0;
-    while (it.hasNext()) {
-      comp &= it.next().equals("v" + (++i));
-    }
-    return comp;
-  }
-
-  private void storeState(AbstractStateBackend.CheckpointStateOutputView out) 
throws Exception {
-    StateCheckpointWriter checkpointBuilder = 
StateCheckpointWriter.create(out);
-    Coder<String> keyCoder = StringUtf8Coder.of();
-
-    // checkpoint the timers
-    StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, 
keyCoder);
-
-    // checkpoint the state
-    StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
-  }
-
-  private boolean checkTimersForKey(Set<TimerInternals.TimerData> 
originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
-    boolean comp = true;
-    if (restoredTimers == null) {
-      return false;
-    }
-
-    if (originalTimers.size() != restoredTimers.size()) {
-      return false;
-    }
-
-    for (TimerInternals.TimerData timer : originalTimers) {
-      comp &= restoredTimers.contains(timer);
-    }
-    return comp;
-  }
-
-  private boolean checkStateForKey(FlinkStateInternals<String> originalState, 
FlinkStateInternals<String> restoredState) throws CannotProvideCoderException {
-    if (restoredState == null) {
-      return false;
-    }
-
-    ValueState<String> orValue = originalState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-    ValueState<String> resValue = restoredState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-    boolean comp = orValue.read().equals(resValue.read());
-
-    ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, 
INT_VALUE_ADDR);
-    ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, 
INT_VALUE_ADDR);
-    comp &= orIntValue.read().equals(resIntValue.read());
-
-    AccumulatorCombiningState<Integer, int[], Integer> combOrValue = 
originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    AccumulatorCombiningState<Integer, int[], Integer> combResValue = 
restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-    comp &= combOrValue.read().equals(combResValue.read());
-
-    WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
-    WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
-    comp &= orWatermark.read().equals(resWatermark.read());
-
-    BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
-    BagState<String> resBag = restoredState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-
-    Iterator<String> orIt = orBag.read().iterator();
-    Iterator<String> resIt = resBag.read().iterator();
-
-    while (orIt.hasNext() && resIt.hasNext()) {
-      comp &= orIt.next().equals(resIt.next());
-    }
-
-    return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && 
resIt.hasNext())) && comp;
-  }
-
-  private FlinkStateInternals<String> createState(String key) throws 
CannotProvideCoderException {
-    return new FlinkStateInternals<>(
-      key,
-      StringUtf8Coder.of(),
-      IntervalWindow.getCoder(),
-      OutputTimeFns.outputAtEarliestInputTimestamp());
-  }
-
-  @Test
-  public void test() throws Exception {
-    StateSerializationTest test = new StateSerializationTest();
-    test.initializeStateAndTimers();
-
-    MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new 
MemoryStateBackend.MemoryCheckpointOutputStream(32048);
-    AbstractStateBackend.CheckpointStateOutputView out = new 
AbstractStateBackend.CheckpointStateOutputView(memBackend);
-
-    test.storeState(out);
-
-    byte[] contents = memBackend.closeAndGetBytes();
-    DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
-
-    assertEquals(test.restoreAndTestState(in), true);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
deleted file mode 100644
index 83c1661..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.streaming;
-
-import org.apache.beam.runners.flink.FlinkTestPipeline;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Joiner;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Arrays;
-
-
-/**
- * Session window test
- */
-public class TopWikipediaSessionsITCase extends StreamingProgramTestBase 
implements Serializable {
-  protected String resultPath;
-
-  public TopWikipediaSessionsITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "user: user1 value:3",
-      "user: user1 value:1",
-      "user: user2 value:4",
-      "user: user2 value:6",
-      "user: user3 value:7",
-      "user: user3 value:2"
-  };
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result");
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-
-    Pipeline p = FlinkTestPipeline.createForStreaming();
-
-    Long now = (System.currentTimeMillis() + 10000) / 1000;
-
-    PCollection<KV<String, Long>> output =
-      p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", 
now + 10).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", 
now + 2).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", 
now).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 1).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 5).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 7).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 8).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 200).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 230).set
-          ("contributor_username", "user1"), new TableRow().set("timestamp", 
now + 230).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 240).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now + 245).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 235).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 236).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 237).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 238).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 239).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 240).set
-          ("contributor_username", "user3"), new TableRow().set("timestamp", 
now + 241).set
-          ("contributor_username", "user2"), new TableRow().set("timestamp", 
now)
-          .set("contributor_username", "user3"))))
-
-
-
-      .apply(ParDo.of(new DoFn<TableRow, String>() {
-        @Override
-        public void processElement(ProcessContext c) throws Exception {
-          TableRow row = c.element();
-          long timestamp = (Integer) row.get("timestamp");
-          String userName = (String) row.get("contributor_username");
-          if (userName != null) {
-            // Sets the timestamp field to be used in windowing.
-            c.outputWithTimestamp(userName, new Instant(timestamp * 1000L));
-          }
-        }
-      }))
-
-      
.apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1))))
-
-      .apply(Count.<String>perElement());
-
-    PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, 
Long>, String>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-        KV<String, Long> el = c.element();
-        String out = "user: " + el.getKey() + " value:" + el.getValue();
-        c.output(out);
-      }
-    }));
-
-    format.apply(TextIO.Write.to(resultPath));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
deleted file mode 100644
index e850dd6..0000000
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.util;
-
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
-import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-/**
- * Copied from {@link com.google.cloud.dataflow.examples.JoinExamples} because 
the code
- * is private there.
- */
-public class JoinExamples {
-
-  // A 1000-row sample of the GDELT data here: gdelt-bq:full.events.
-  private static final String GDELT_EVENTS_TABLE =
-      "clouddataflow-readonly:samples.gdelt_sample";
-  // A table that maps country codes to country names.
-  private static final String COUNTRY_CODES =
-      "gdelt-bq:full.crosswalk_geocountrycodetohuman";
-
-  /**
-   * Join two collections, using country code as the key.
-   */
-  public static PCollection<String> joinEvents(PCollection<TableRow> 
eventsTable,
-                                        PCollection<TableRow> countryCodes) 
throws Exception {
-
-    final TupleTag<String> eventInfoTag = new TupleTag<>();
-    final TupleTag<String> countryInfoTag = new TupleTag<>();
-
-    // transform both input collections to tuple collections, where the keys 
are country
-    // codes in both cases.
-    PCollection<KV<String, String>> eventInfo = eventsTable.apply(
-        ParDo.of(new ExtractEventDataFn()));
-    PCollection<KV<String, String>> countryInfo = countryCodes.apply(
-        ParDo.of(new ExtractCountryInfoFn()));
-
-    // country code 'key' -> CGBKR (<event info>, <country name>)
-    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
-        .of(eventInfoTag, eventInfo)
-        .and(countryInfoTag, countryInfo)
-        .apply(CoGroupByKey.<String>create());
-
-    // Process the CoGbkResult elements generated by the CoGroupByKey 
transform.
-    // country code 'key' -> string of <event info>, <country name>
-    PCollection<KV<String, String>> finalResultCollection =
-        kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, 
KV<String, String>>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            KV<String, CoGbkResult> e = c.element();
-            CoGbkResult val = e.getValue();
-            String countryCode = e.getKey();
-            String countryName;
-            countryName = e.getValue().getOnly(countryInfoTag, "Kostas");
-            for (String eventInfo : 
c.element().getValue().getAll(eventInfoTag)) {
-              // Generate a string that combines information from both 
collection values
-              c.output(KV.of(countryCode, "Country name: " + countryName
-                  + ", Event info: " + eventInfo));
-            }
-          }
-        }));
-
-    // write to GCS
-    return finalResultCollection
-        .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
-          @Override
-          public void processElement(ProcessContext c) {
-            String outputstring = "Country code: " + c.element().getKey()
-                + ", " + c.element().getValue();
-            c.output(outputstring);
-          }
-        }));
-  }
-
-  /**
-   * Examines each row (event) in the input table. Output a KV with the key 
the country
-   * code of the event, and the value a string encoding event information.
-   */
-  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      String countryCode = (String) row.get("ActionGeo_CountryCode");
-      String sqlDate = (String) row.get("SQLDATE");
-      String actor1Name = (String) row.get("Actor1Name");
-      String sourceUrl = (String) row.get("SOURCEURL");
-      String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", 
url: " + sourceUrl;
-      c.output(KV.of(countryCode, eventInfo));
-    }
-  }
-
-
-  /**
-   * Examines each row (country info) in the input table. Output a KV with the 
key the country
-   * code, and the value the country name.
-   */
-  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> 
{
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = c.element();
-      String countryCode = (String) row.get("FIPSCC");
-      String countryName = (String) row.get("HumanName");
-      c.output(KV.of(countryCode, countryName));
-    }
-  }
-
-
-  /**
-   * Options supported by {@link JoinExamples}.
-   * <p>
-   * Inherits standard configuration options.
-   */
-  private interface Options extends PipelineOptions {
-    @Description("Path of the file to write to")
-    @Validation.Required
-    String getOutput();
-    void setOutput(String value);
-  }
-
-  public static void main(String[] args) throws Exception {
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    Pipeline p = Pipeline.create(options);
-    // the following two 'applys' create multiple inputs to our pipeline, one 
for each
-    // of our two input sources.
-    PCollection<TableRow> eventsTable = 
p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
-    PCollection<TableRow> countryCodes = 
p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
-    PCollection<String> formattedResults = joinEvents(eventsTable, 
countryCodes);
-    formattedResults.apply(TextIO.Write.to(options.getOutput()));
-    p.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/resources/log4j-test.properties 
b/runners/flink/src/test/resources/log4j-test.properties
deleted file mode 100644
index 4c74d85..0000000
--- a/runners/flink/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 980900b..4f07ceb 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -1,43 +1,43 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
 -->
 <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";
-        xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+    xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
 
-    <modelVersion>4.0.0</modelVersion>
+  <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-        <groupId>com.google.cloud.dataflow</groupId>
-        <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
-        <version>1.6.0-SNAPSHOT</version>
-    </parent>
-
-    <groupId>org.apache.beam</groupId>
-    <artifactId>runners</artifactId>
+  <parent>
+    <groupId>com.google.cloud.dataflow</groupId>
+    <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId>
     <version>1.6.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.beam</groupId>
+  <artifactId>runners-parent</artifactId>
+  <version>1.6.0-SNAPSHOT</version>
 
-    <packaging>pom</packaging>
+  <packaging>pom</packaging>
 
-    <name>Beam Runners</name>
+  <name>Beam Runners</name>
 
-    <modules>
-        <module>flink</module>
-    </modules>
+  <modules>
+    <module>flink</module>
+  </modules>
 
 </project>

Reply via email to