http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index 01f9c32..fb240f4 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -43,464 +43,464 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class GroupAlsoByWindowTest {
 
-       private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
-
-       private final WindowingStrategy 
slidingWindowWithAfterWatermarkTriggerStrategy =
-                       
WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
-                                       
.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-       private final WindowingStrategy sessionWindowingStrategy =
-                       
WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
-                                       
.withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
-                                       
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
-                                       
.withAllowedLateness(Duration.standardSeconds(100));
-
-       private final WindowingStrategy fixedWindowingStrategy =
-                       
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
-
-       private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
-                       
fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
-
-       private final WindowingStrategy 
fixedWindowWithAfterWatermarkTriggerStrategy =
-                       
fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
-
-       private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
-               fixedWindowingStrategy.withTrigger(
-                       
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
-                               
.withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
-
-       /**
-        * The default accumulation mode is
-        * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
-        * This strategy changes it to
-        * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
-        */
-       private final WindowingStrategy 
fixedWindowWithCompoundTriggerStrategyAcc =
-                       fixedWindowWithCompoundTriggerStrategy
-                                       
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-       @Test
-       public void testWithLateness() throws Exception {
-               WindowingStrategy strategy = 
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
-                               
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
-                               .withAllowedLateness(Duration.millis(1000));
-               long initialTime = 0L;
-               Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-               KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
-               FlinkGroupAlsoByWindowWrapper gbwOperaror =
-                               FlinkGroupAlsoByWindowWrapper.createForTesting(
-                                               pipeline.getOptions(),
-                                               pipeline.getCoderRegistry(),
-                                               strategy,
-                                               inputCoder,
-                                               combiner.<String>asKeyedFn());
-
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(gbwOperaror);
-               testHarness.open();
-
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processWatermark(new Watermark(initialTime + 2000));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processWatermark(new Watermark(initialTime + 4000));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 4),
-                                               new Instant(initialTime + 1),
-                                               new IntervalWindow(new 
Instant(0), new Instant(2000)),
-                                               PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
-                               , initialTime + 1));
-               expectedOutput.add(new Watermark(initialTime + 2000));
-
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 5),
-                                               new Instant(initialTime + 1999),
-                                               new IntervalWindow(new 
Instant(0), new Instant(2000)),
-                                               PaneInfo.createPane(false, 
false, PaneInfo.Timing.LATE, 1, 1))
-                               , initialTime + 1999));
-
-
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 6),
-                                               new Instant(initialTime + 1999),
-                                               new IntervalWindow(new 
Instant(0), new Instant(2000)),
-                                               PaneInfo.createPane(false, 
false, PaneInfo.Timing.LATE, 2, 2))
-                               , initialTime + 1999));
-               expectedOutput.add(new Watermark(initialTime + 4000));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       public void testSessionWindows() throws Exception {
-               WindowingStrategy strategy = sessionWindowingStrategy;
-
-               long initialTime = 0L;
-               Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-               KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
-               FlinkGroupAlsoByWindowWrapper gbwOperaror =
-                               FlinkGroupAlsoByWindowWrapper.createForTesting(
-                                               pipeline.getOptions(),
-                                               pipeline.getCoderRegistry(),
-                                               strategy,
-                                               inputCoder,
-                                               combiner.<String>asKeyedFn());
-
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(gbwOperaror);
-               testHarness.open();
-
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processWatermark(new Watermark(initialTime + 6000));
-
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-               testHarness.processWatermark(new Watermark(initialTime + 
12000));
-
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 6),
-                                               new Instant(initialTime + 1),
-                                               new IntervalWindow(new 
Instant(1), new Instant(5700)),
-                                               PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
-                               , initialTime + 1));
-               expectedOutput.add(new Watermark(initialTime + 6000));
-
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 11),
-                                               new Instant(initialTime + 6700),
-                                               new IntervalWindow(new 
Instant(1), new Instant(10900)),
-                                               PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
-                               , initialTime + 6700));
-               expectedOutput.add(new Watermark(initialTime + 12000));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       public void testSlidingWindows() throws Exception {
-               WindowingStrategy strategy = 
slidingWindowWithAfterWatermarkTriggerStrategy;
-               long initialTime = 0L;
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               createTestingOperatorAndState(strategy, 
initialTime);
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-               testHarness.processWatermark(new Watermark(initialTime + 
25000));
-
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 6),
-                                               new Instant(initialTime + 5000),
-                                               new IntervalWindow(new 
Instant(0), new Instant(10000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 5000));
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 6),
-                                               new Instant(initialTime + 1),
-                                               new IntervalWindow(new 
Instant(-5000), new Instant(5000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 1));
-               expectedOutput.add(new Watermark(initialTime + 10000));
-
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 11),
-                                               new Instant(initialTime + 
15000),
-                                               new IntervalWindow(new 
Instant(10000), new Instant(20000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 15000));
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 3),
-                                               new Instant(initialTime + 
10000),
-                                               new IntervalWindow(new 
Instant(5000), new Instant(15000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 10000));
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key2", 1),
-                                               new Instant(initialTime + 
19500),
-                                               new IntervalWindow(new 
Instant(10000), new Instant(20000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 19500));
-               expectedOutput.add(new Watermark(initialTime + 20000));
-
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key2", 1),
-                                               new Instant(initialTime + 
20000),
-                                               /**
-                                                * this is 20000 and not 19500 
because of a convention in dataflow where
-                                                * timestamps of windowed 
values in a window cannot be smaller than the
-                                                * end of a previous window. 
Checkout the documentation of the
-                                                * {@link 
WindowFn#getOutputTime(Instant, BoundedWindow)}
-                                                */
-                                               new IntervalWindow(new 
Instant(15000), new Instant(25000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 20000));
-               expectedOutput.add(new StreamRecord<>(
-                               WindowedValue.of(KV.of("key1", 8),
-                                               new Instant(initialTime + 
20000),
-                                               new IntervalWindow(new 
Instant(15000), new Instant(25000)),
-                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime + 20000));
-               expectedOutput.add(new Watermark(initialTime + 25000));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       public void testAfterWatermarkProgram() throws Exception {
-               WindowingStrategy strategy = 
fixedWindowWithAfterWatermarkTriggerStrategy;
-               long initialTime = 0L;
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               createTestingOperatorAndState(strategy, 
initialTime);
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
-               expectedOutput.add(new Watermark(initialTime + 10000));
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
10000));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
19500));
-               expectedOutput.add(new Watermark(initialTime + 20000));
-
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-               testHarness.close();
-       }
-
-       @Test
-       public void testAfterCountProgram() throws Exception {
-               WindowingStrategy strategy = 
fixedWindowWithCountTriggerStrategy;
-
-               long initialTime = 0L;
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               createTestingOperatorAndState(strategy, 
initialTime);
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
-               expectedOutput.add(new Watermark(initialTime + 10000));
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 
19500));
-               expectedOutput.add(new Watermark(initialTime + 20000));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testCompoundProgram() throws Exception {
-               WindowingStrategy strategy = 
fixedWindowWithCompoundTriggerStrategy;
-
-               long initialTime = 0L;
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               createTestingOperatorAndState(strategy, 
initialTime);
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               /**
-                * PaneInfo are:
-                *              isFirst (pane in window),
-                *              isLast, Timing (of triggering),
-                *              index (of pane in the window),
-                *              onTimeIndex (if it the 1st,2nd, ... pane that 
was fired on time)
-                * */
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 
19500));
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 
1200));
-
-               expectedOutput.add(new Watermark(initialTime + 10000));
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 
19500));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
19500));
-
-               expectedOutput.add(new Watermark(initialTime + 20000));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testCompoundAccumulatingPanesProgram() throws Exception {
-               WindowingStrategy strategy = 
fixedWindowWithCompoundTriggerStrategyAcc;
-               long initialTime = 0L;
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               createTestingOperatorAndState(strategy, 
initialTime);
-               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 
19500));
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 
1200));
-
-               expectedOutput.add(new Watermark(initialTime + 10000));
-
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 
19500));
-               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
19500));
-
-               expectedOutput.add(new Watermark(initialTime + 20000));
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-               testHarness.close();
-       }
-
-       private OneInputStreamOperatorTestHarness 
createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) 
throws Exception {
-               Pipeline pipeline = FlinkTestPipeline.createForStreaming();
-
-               KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
-
-               FlinkGroupAlsoByWindowWrapper gbwOperaror =
-                               FlinkGroupAlsoByWindowWrapper.createForTesting(
-                                               pipeline.getOptions(),
-                                               pipeline.getCoderRegistry(),
-                                               strategy,
-                                               inputCoder,
-                                               combiner.<String>asKeyedFn());
-
-               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
-                               new 
OneInputStreamOperatorTestHarness<>(gbwOperaror);
-               testHarness.open();
-
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
-
-               testHarness.processWatermark(new Watermark(initialTime + 
10000));
-               testHarness.processWatermark(new Watermark(initialTime + 
20000));
-
-               return testHarness;
-       }
-
-       private static class ResultSortComparator implements Comparator<Object> 
{
-               @Override
-               public int compare(Object o1, Object o2) {
-                       if (o1 instanceof Watermark && o2 instanceof Watermark) 
{
-                               Watermark w1 = (Watermark) o1;
-                               Watermark w2 = (Watermark) o2;
-                               return (int) (w1.getTimestamp() - 
w2.getTimestamp());
-                       } else {
-                               StreamRecord<WindowedValue<KV<String, 
Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
-                               StreamRecord<WindowedValue<KV<String, 
Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
-
-                               int comparison = (int) 
(sr0.getValue().getTimestamp().getMillis() - 
sr1.getValue().getTimestamp().getMillis());
-                               if (comparison != 0) {
-                                       return comparison;
-                               }
-
-                               comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
-                               if(comparison == 0) {
-                                       comparison = Integer.compare(
-                                                       
sr0.getValue().getValue().getValue(),
-                                                       
sr1.getValue().getValue().getValue());
-                               }
-                               if(comparison == 0) {
-                                       Collection windowsA = 
sr0.getValue().getWindows();
-                                       Collection windowsB = 
sr1.getValue().getWindows();
-
-                                       if(windowsA.size() != 1 || 
windowsB.size() != 1) {
-                                               throw new 
IllegalStateException("A value cannot belong to more than one windows after 
grouping.");
-                                       }
-
-                                       BoundedWindow windowA = (BoundedWindow) 
windowsA.iterator().next();
-                                       BoundedWindow windowB = (BoundedWindow) 
windowsB.iterator().next();
-                                       comparison = 
Long.compare(windowA.maxTimestamp().getMillis(), 
windowB.maxTimestamp().getMillis());
-                               }
-                               return comparison;
-                       }
-               }
-       }
-
-       private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy 
strategy,
-                                                                               
                   T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
-               final Instant inputTimestamp = timestamp;
-               final WindowFn windowFn = strategy.getWindowFn();
-
-               if (timestamp == null) {
-                       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-               }
-
-               if (windows == null) {
-                       try {
-                               windows = windowFn.assignWindows(windowFn.new 
AssignContext() {
-                                       @Override
-                                       public Object element() {
-                                               throw new 
UnsupportedOperationException(
-                                                               "WindowFn 
attempted to access input element when none was available");
-                                       }
-
-                                       @Override
-                                       public Instant timestamp() {
-                                               if (inputTimestamp == null) {
-                                                       throw new 
UnsupportedOperationException(
-                                                                       
"WindowFn attempted to access input timestamp when none was available");
-                                               }
-                                               return inputTimestamp;
-                                       }
-
-                                       @Override
-                                       public Collection<? extends 
BoundedWindow> windows() {
-                                               throw new 
UnsupportedOperationException(
-                                                               "WindowFn 
attempted to access input windows when none were available");
-                                       }
-                               });
-                       } catch (Exception e) {
-                               throw UserCodeException.wrap(e);
-                       }
-               }
-
-               return WindowedValue.of(output, timestamp, windows, pane);
-       }
+  private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
+
+  private final WindowingStrategy 
slidingWindowWithAfterWatermarkTriggerStrategy =
+      
WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+          
.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+  private final WindowingStrategy sessionWindowingStrategy =
+      
WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
+          .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+          
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+          .withAllowedLateness(Duration.standardSeconds(100));
+
+  private final WindowingStrategy fixedWindowingStrategy =
+      WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+
+  private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
+      fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
+
+  private final WindowingStrategy fixedWindowWithAfterWatermarkTriggerStrategy 
=
+      fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
+
+  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
+    fixedWindowingStrategy.withTrigger(
+      
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(5))
+        .withLateFirings(AfterPane.elementCountAtLeast(5)).buildTrigger());
+
+  /**
+   * The default accumulation mode is
+   * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
+   * This strategy changes it to
+   * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
+   */
+  private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc =
+      fixedWindowWithCompoundTriggerStrategy
+          
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+  @Test
+  public void testWithLateness() throws Exception {
+    WindowingStrategy strategy = 
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
+        .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(Duration.millis(1000));
+    long initialTime = 0L;
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 2000));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 4000));
+
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 4),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 2000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 5),
+            new Instant(initialTime + 1999),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
+        , initialTime + 1999));
+
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1999),
+            new IntervalWindow(new Instant(0), new Instant(2000)),
+            PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
+        , initialTime + 1999));
+    expectedOutput.add(new Watermark(initialTime + 4000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testSessionWindows() throws Exception {
+    WindowingStrategy strategy = sessionWindowingStrategy;
+
+    long initialTime = 0L;
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processWatermark(new Watermark(initialTime + 12000));
+
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(1), new Instant(5700)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 6000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 11),
+            new Instant(initialTime + 6700),
+            new IntervalWindow(new Instant(1), new Instant(10900)),
+            PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
+        , initialTime + 6700));
+    expectedOutput.add(new Watermark(initialTime + 12000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testSlidingWindows() throws Exception {
+    WindowingStrategy strategy = 
slidingWindowWithAfterWatermarkTriggerStrategy;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+    testHarness.processWatermark(new Watermark(initialTime + 25000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 5000),
+            new IntervalWindow(new Instant(0), new Instant(10000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 5000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 6),
+            new Instant(initialTime + 1),
+            new IntervalWindow(new Instant(-5000), new Instant(5000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 11),
+            new Instant(initialTime + 15000),
+            new IntervalWindow(new Instant(10000), new Instant(20000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 15000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 3),
+            new Instant(initialTime + 10000),
+            new IntervalWindow(new Instant(5000), new Instant(15000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key2", 1),
+            new Instant(initialTime + 19500),
+            new IntervalWindow(new Instant(10000), new Instant(20000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key2", 1),
+            new Instant(initialTime + 20000),
+            /**
+             * this is 20000 and not 19500 because of a convention in dataflow 
where
+             * timestamps of windowed values in a window cannot be smaller 
than the
+             * end of a previous window. Checkout the documentation of the
+             * {@link WindowFn#getOutputTime(Instant, BoundedWindow)}
+             */
+            new IntervalWindow(new Instant(15000), new Instant(25000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 20000));
+    expectedOutput.add(new StreamRecord<>(
+        WindowedValue.of(KV.of("key1", 8),
+            new Instant(initialTime + 20000),
+            new IntervalWindow(new Instant(15000), new Instant(25000)),
+            PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
+        , initialTime + 20000));
+    expectedOutput.add(new Watermark(initialTime + 25000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testAfterWatermarkProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithAfterWatermarkTriggerStrategy;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 6),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME)), initialTime + 1));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 11),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+    testHarness.close();
+  }
+
+  @Test
+  public void testAfterCountProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCountTriggerStrategy;
+
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, 
PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testCompoundProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategy;
+
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    /**
+     * PaneInfo are:
+     *     isFirst (pane in window),
+     *     isLast, Timing (of triggering),
+     *     index (of pane in the window),
+     *     onTimeIndex (if it the 1st,2nd, ... pane that was fired on time)
+     * */
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, 
PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1),
+        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testCompoundAccumulatingPanesProgram() throws Exception {
+    WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
+    long initialTime = 0L;
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        createTestingOperatorAndState(strategy, initialTime);
+    ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, 
PaneInfo.Timing.EARLY)), initialTime + 1));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 5),
+        new Instant(initialTime + 10000), null, PaneInfo.createPane(true, 
false, PaneInfo.Timing.EARLY)), initialTime + 10000));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 10),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 6),
+        new Instant(initialTime + 1200), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
+
+    expectedOutput.add(new Watermark(initialTime + 10000));
+
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 11),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(false, 
true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
+    expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1),
+        new Instant(initialTime + 19500), null, PaneInfo.createPane(true, 
true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
+
+    expectedOutput.add(new Watermark(initialTime + 20000));
+    TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", 
expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+    testHarness.close();
+  }
+
+  private OneInputStreamOperatorTestHarness 
createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) 
throws Exception {
+    Pipeline pipeline = FlinkTestPipeline.createForStreaming();
+
+    KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+
+    FlinkGroupAlsoByWindowWrapper gbwOperaror =
+        FlinkGroupAlsoByWindowWrapper.createForTesting(
+            pipeline.getOptions(),
+            pipeline.getCoderRegistry(),
+            strategy,
+            inputCoder,
+            combiner.<String>asKeyedFn());
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Integer>>, 
WindowedValue<KV<String, Integer>>> testHarness =
+        new OneInputStreamOperatorTestHarness<>(gbwOperaror);
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key1", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processElement(new StreamRecord<>(makeWindowedValue(strategy, 
KV.of("key2", 1), new Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), 
initialTime + 20));
+
+    testHarness.processWatermark(new Watermark(initialTime + 10000));
+    testHarness.processWatermark(new Watermark(initialTime + 20000));
+
+    return testHarness;
+  }
+
+  private static class ResultSortComparator implements Comparator<Object> {
+    @Override
+    public int compare(Object o1, Object o2) {
+      if (o1 instanceof Watermark && o2 instanceof Watermark) {
+        Watermark w1 = (Watermark) o1;
+        Watermark w2 = (Watermark) o2;
+        return (int) (w1.getTimestamp() - w2.getTimestamp());
+      } else {
+        StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = 
(StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
+        StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = 
(StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
+
+        int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - 
sr1.getValue().getTimestamp().getMillis());
+        if (comparison != 0) {
+          return comparison;
+        }
+
+        comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+        if(comparison == 0) {
+          comparison = Integer.compare(
+              sr0.getValue().getValue().getValue(),
+              sr1.getValue().getValue().getValue());
+        }
+        if(comparison == 0) {
+          Collection windowsA = sr0.getValue().getWindows();
+          Collection windowsB = sr1.getValue().getWindows();
+
+          if(windowsA.size() != 1 || windowsB.size() != 1) {
+            throw new IllegalStateException("A value cannot belong to more 
than one windows after grouping.");
+          }
+
+          BoundedWindow windowA = (BoundedWindow) windowsA.iterator().next();
+          BoundedWindow windowB = (BoundedWindow) windowsB.iterator().next();
+          comparison = Long.compare(windowA.maxTimestamp().getMillis(), 
windowB.maxTimestamp().getMillis());
+        }
+        return comparison;
+      }
+    }
+  }
+
+  private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy strategy,
+                           T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
+    final Instant inputTimestamp = timestamp;
+    final WindowFn windowFn = strategy.getWindowFn();
+
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    if (windows == null) {
+      try {
+        windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public Object element() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input element when none was 
available");
+          }
+
+          @Override
+          public Instant timestamp() {
+            if (inputTimestamp == null) {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input timestamp when none was 
available");
+            }
+            return inputTimestamp;
+          }
+
+          @Override
+          public Collection<? extends BoundedWindow> windows() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input windows when none were 
available");
+          }
+        });
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    return WindowedValue.of(output, timestamp, windows, pane);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
index 5a412aa..52e9e25 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupByNullKeyTest.java
@@ -39,83 +39,83 @@ import java.util.Arrays;
 public class GroupByNullKeyTest extends StreamingProgramTestBase implements 
Serializable {
 
 
-       protected String resultPath;
+  protected String resultPath;
 
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "k: null v: user1 user1 user1 user2 user2 user2 user2 
user3"
-       };
+  static final String[] EXPECTED_RESULT = new String[] {
+      "k: null v: user1 user1 user1 user2 user2 user2 user2 user3"
+  };
 
-       public GroupByNullKeyTest(){
-       }
+  public GroupByNullKeyTest(){
+  }
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
 
-       public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, 
String>, String> {
-               private static final long serialVersionUID = 0;
+  public static class ExtractUserAndTimestamp extends DoFn<KV<Integer, 
String>, String> {
+    private static final long serialVersionUID = 0;
 
-               @Override
-               public void processElement(ProcessContext c) {
-                       KV<Integer, String> record = c.element();
-                       long now = System.currentTimeMillis();
-                       int timestamp = record.getKey();
-                       String userName = record.getValue();
-                       if (userName != null) {
-                               // Sets the implicit timestamp field to be used 
in windowing.
-                               c.outputWithTimestamp(userName, new 
Instant(timestamp + now));
-                       }
-               }
-       }
+    @Override
+    public void processElement(ProcessContext c) {
+      KV<Integer, String> record = c.element();
+      long now = System.currentTimeMillis();
+      int timestamp = record.getKey();
+      String userName = record.getValue();
+      if (userName != null) {
+        // Sets the implicit timestamp field to be used in windowing.
+        c.outputWithTimestamp(userName, new Instant(timestamp + now));
+      }
+    }
+  }
 
-       @Override
-       protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-               Pipeline p = FlinkTestPipeline.createForStreaming();
+    Pipeline p = FlinkTestPipeline.createForStreaming();
 
-               PCollection<String> output =
-                       p.apply(Create.of(Arrays.asList(
-                                       KV.<Integer, String>of(0, "user1"),
-                                       KV.<Integer, String>of(1, "user1"),
-                                       KV.<Integer, String>of(2, "user1"),
-                                       KV.<Integer, String>of(10, "user2"),
-                                       KV.<Integer, String>of(1, "user2"),
-                                       KV.<Integer, String>of(15000, "user2"),
-                                       KV.<Integer, String>of(12000, "user2"),
-                                       KV.<Integer, String>of(25000, 
"user3"))))
-                                       .apply(ParDo.of(new 
ExtractUserAndTimestamp()))
-                                       
.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
-                                                       
.triggering(AfterWatermark.pastEndOfWindow())
-                                                       
.withAllowedLateness(Duration.ZERO)
-                                                       .discardingFiredPanes())
+    PCollection<String> output =
+      p.apply(Create.of(Arrays.asList(
+          KV.<Integer, String>of(0, "user1"),
+          KV.<Integer, String>of(1, "user1"),
+          KV.<Integer, String>of(2, "user1"),
+          KV.<Integer, String>of(10, "user2"),
+          KV.<Integer, String>of(1, "user2"),
+          KV.<Integer, String>of(15000, "user2"),
+          KV.<Integer, String>of(12000, "user2"),
+          KV.<Integer, String>of(25000, "user3"))))
+          .apply(ParDo.of(new ExtractUserAndTimestamp()))
+          
.apply(Window.<String>into(FixedWindows.of(Duration.standardHours(1)))
+              .triggering(AfterWatermark.pastEndOfWindow())
+              .withAllowedLateness(Duration.ZERO)
+              .discardingFiredPanes())
 
-                                       .apply(ParDo.of(new DoFn<String, 
KV<Void, String>>() {
-                                               @Override
-                                               public void 
processElement(ProcessContext c) throws Exception {
-                                                       String elem = 
c.element();
-                                                       c.output(KV.<Void, 
String>of((Void) null, elem));
-                                               }
-                                       }))
-                                       .apply(GroupByKey.<Void, 
String>create())
-                                       .apply(ParDo.of(new DoFn<KV<Void, 
Iterable<String>>, String>() {
-                                               @Override
-                                               public void 
processElement(ProcessContext c) throws Exception {
-                                                       KV<Void, 
Iterable<String>> elem = c.element();
-                                                       StringBuilder str = new 
StringBuilder();
-                                                       str.append("k: " + 
elem.getKey() + " v:");
-                                                       for (String v : 
elem.getValue()) {
-                                                               str.append(" " 
+ v);
-                                                       }
-                                                       
c.output(str.toString());
-                                               }
-                                       }));
-               output.apply(TextIO.Write.to(resultPath));
-               p.run();
-       }
+          .apply(ParDo.of(new DoFn<String, KV<Void, String>>() {
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              String elem = c.element();
+              c.output(KV.<Void, String>of((Void) null, elem));
+            }
+          }))
+          .apply(GroupByKey.<Void, String>create())
+          .apply(ParDo.of(new DoFn<KV<Void, Iterable<String>>, String>() {
+            @Override
+            public void processElement(ProcessContext c) throws Exception {
+              KV<Void, Iterable<String>> elem = c.element();
+              StringBuilder str = new StringBuilder();
+              str.append("k: " + elem.getKey() + " v:");
+              for (String v : elem.getValue()) {
+                str.append(" " + v);
+              }
+              c.output(str.toString());
+            }
+          }));
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
index 7489fcc..d5b1043 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
@@ -43,261 +43,261 @@ import static org.junit.Assert.assertEquals;
 
 public class StateSerializationTest {
 
-       private static final StateNamespace NAMESPACE_1 = 
StateNamespaces.global();
-       private static final String KEY_PREFIX = "TEST_";
-
-       // TODO: This can be replaced with the standard Sum.SumIntererFn once 
the state no longer needs
-       // to create a StateTag at the point of restoring state. Currently 
StateTags are compared strictly
-       // by type and combiners always use KeyedCombineFnWithContext rather 
than KeyedCombineFn or CombineFn.
-       private static CombineWithContext.KeyedCombineFnWithContext<Object, 
Integer, int[], Integer> SUM_COMBINER =
-               new CombineWithContext.KeyedCombineFnWithContext<Object, 
Integer, int[], Integer>() {
-                       @Override
-                       public int[] createAccumulator(Object key, 
CombineWithContext.Context c) {
-                               return new int[1];
-                       }
-
-                       @Override
-                       public int[] addInput(Object key, int[] accumulator, 
Integer value, CombineWithContext.Context c) {
-                               accumulator[0] += value;
-                               return accumulator;
-                       }
-
-                       @Override
-                       public int[] mergeAccumulators(Object key, 
Iterable<int[]> accumulators, CombineWithContext.Context c) {
-                               int[] r = new int[1];
-                               for (int[] a : accumulators) {
-                                       r[0] += a[0];
-                               }
-                               return r;
-                       }
-
-                       @Override
-                       public Integer extractOutput(Object key, int[] 
accumulator, CombineWithContext.Context c) {
-                               return accumulator[0];
-                       }
-               };
-
-       private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
-               VarIntCoder.of(),
-               new DelegateCoder.CodingFunction<int[], Integer>() {
-                       @Override
-                       public Integer apply(int[] accumulator) {
-                               return accumulator[0];
-                       }
-               },
-               new DelegateCoder.CodingFunction<Integer, int[]>() {
-                       @Override
-                       public int[] apply(Integer value) {
-                               int[] a = new int[1];
-                               a[0] = value;
-                               return a;
-                       }
-               });
-
-       private static final StateTag<Object, ValueState<String>> 
STRING_VALUE_ADDR =
-               StateTags.value("stringValue", StringUtf8Coder.of());
-       private static final StateTag<Object, ValueState<Integer>> 
INT_VALUE_ADDR =
-               StateTags.value("stringValue", VarIntCoder.of());
-       private static final StateTag<Object, 
AccumulatorCombiningState<Integer, int[], Integer>> SUM_INTEGER_ADDR =
-               StateTags.keyedCombiningValueWithContext("sumInteger", 
INT_ACCUM_CODER, SUM_COMBINER);
-       private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR 
=
-               StateTags.bag("stringBag", StringUtf8Coder.of());
-       private static final StateTag<Object, 
WatermarkHoldState<BoundedWindow>> WATERMARK_BAG_ADDR =
-               StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEarliestInputTimestamp());
-
-       private Map<String, FlinkStateInternals<String>> statePerKey = new 
HashMap<>();
-
-       private Map<String, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
-
-       private void initializeStateAndTimers() throws 
CannotProvideCoderException {
-               for (int i = 0; i < 10; i++) {
-                       String key = KEY_PREFIX + i;
-
-                       FlinkStateInternals state = initializeStateForKey(key);
-                       Set<TimerInternals.TimerData> timers = new HashSet<>();
-                       for (int j = 0; j < 5; j++) {
-                               TimerInternals.TimerData timer = TimerInternals
-                                       .TimerData.of(NAMESPACE_1,
-                                               new Instant(1000 + i + j), 
TimeDomain.values()[j % 3]);
-                               timers.add(timer);
-                       }
-
-                       statePerKey.put(key, state);
-                       activeTimers.put(key, timers);
-               }
-       }
-
-       private FlinkStateInternals<String> initializeStateForKey(String key) 
throws CannotProvideCoderException {
-               FlinkStateInternals<String> state = createState(key);
-
-               ValueState<String> value = state.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-               value.write("test");
-
-               ValueState<Integer> value2 = state.state(NAMESPACE_1, 
INT_VALUE_ADDR);
-               value2.write(4);
-               value2.write(5);
-
-               AccumulatorCombiningState<Integer, int[], Integer> 
combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-               combiningValue.add(1);
-               combiningValue.add(2);
-
-               WatermarkHoldState<BoundedWindow> watermark = 
state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-               watermark.add(new Instant(1000));
-
-               BagState<String> bag = state.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-               bag.add("v1");
-               bag.add("v2");
-               bag.add("v3");
-               bag.add("v4");
-               return state;
-       }
-
-       private boolean restoreAndTestState(DataInputView in) throws Exception {
-               StateCheckpointReader reader = new StateCheckpointReader(in);
-               final ClassLoader userClassloader = 
this.getClass().getClassLoader();
-               Coder<? extends BoundedWindow> windowCoder = 
IntervalWindow.getCoder();
-               Coder<String> keyCoder = StringUtf8Coder.of();
-
-               boolean comparisonRes = true;
-
-               for (String key : statePerKey.keySet()) {
-                       comparisonRes &= checkStateForKey(key);
-               }
-
-               // restore the timers
-               Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey 
= StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
-               if (activeTimers.size() != restoredTimersPerKey.size()) {
-                       return false;
-               }
-
-               for (String key : statePerKey.keySet()) {
-                       Set<TimerInternals.TimerData> originalTimers = 
activeTimers.get(key);
-                       Set<TimerInternals.TimerData> restoredTimers = 
restoredTimersPerKey.get(key);
-                       comparisonRes &= checkTimersForKey(originalTimers, 
restoredTimers);
-               }
-
-               // restore the state
-               Map<String, FlinkStateInternals<String>> restoredPerKeyState =
-                       StateCheckpointUtils.decodeState(reader, 
OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, 
userClassloader);
-               if (restoredPerKeyState.size() != statePerKey.size()) {
-                       return false;
-               }
-
-               for (String key : statePerKey.keySet()) {
-                       FlinkStateInternals<String> originalState = 
statePerKey.get(key);
-                       FlinkStateInternals<String> restoredState = 
restoredPerKeyState.get(key);
-                       comparisonRes &= checkStateForKey(originalState, 
restoredState);
-               }
-               return comparisonRes;
-       }
-
-       private boolean checkStateForKey(String key) throws 
CannotProvideCoderException {
-               FlinkStateInternals<String> state = statePerKey.get(key);
-
-               ValueState<String> value = state.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-               boolean comp = value.read().equals("test");
-
-               ValueState<Integer> value2 = state.state(NAMESPACE_1, 
INT_VALUE_ADDR);
-               comp &= value2.read().equals(5);
-
-               AccumulatorCombiningState<Integer, int[], Integer> 
combiningValue = state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-               comp &= combiningValue.read().equals(3);
-
-               WatermarkHoldState<BoundedWindow> watermark = 
state.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-               comp &= watermark.read().equals(new Instant(1000));
-
-               BagState<String> bag = state.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-               Iterator<String> it = bag.read().iterator();
-               int i = 0;
-               while (it.hasNext()) {
-                       comp &= it.next().equals("v" + (++i));
-               }
-               return comp;
-       }
-
-       private void storeState(AbstractStateBackend.CheckpointStateOutputView 
out) throws Exception {
-               StateCheckpointWriter checkpointBuilder = 
StateCheckpointWriter.create(out);
-               Coder<String> keyCoder = StringUtf8Coder.of();
-
-               // checkpoint the timers
-               StateCheckpointUtils.encodeTimers(activeTimers, 
checkpointBuilder, keyCoder);
-
-               // checkpoint the state
-               StateCheckpointUtils.encodeState(statePerKey, 
checkpointBuilder, keyCoder);
-       }
-
-       private boolean checkTimersForKey(Set<TimerInternals.TimerData> 
originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
-               boolean comp = true;
-               if (restoredTimers == null) {
-                       return false;
-               }
-
-               if (originalTimers.size() != restoredTimers.size()) {
-                       return false;
-               }
-
-               for (TimerInternals.TimerData timer : originalTimers) {
-                       comp &= restoredTimers.contains(timer);
-               }
-               return comp;
-       }
-
-       private boolean checkStateForKey(FlinkStateInternals<String> 
originalState, FlinkStateInternals<String> restoredState) throws 
CannotProvideCoderException {
-               if (restoredState == null) {
-                       return false;
-               }
-
-               ValueState<String> orValue = originalState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-               ValueState<String> resValue = restoredState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
-               boolean comp = orValue.read().equals(resValue.read());
-
-               ValueState<Integer> orIntValue = 
originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
-               ValueState<Integer> resIntValue = 
restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
-               comp &= orIntValue.read().equals(resIntValue.read());
-
-               AccumulatorCombiningState<Integer, int[], Integer> combOrValue 
= originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-               AccumulatorCombiningState<Integer, int[], Integer> combResValue 
= restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
-               comp &= combOrValue.read().equals(combResValue.read());
-
-               WatermarkHoldState orWatermark = 
originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-               WatermarkHoldState resWatermark = 
restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
-               comp &= orWatermark.read().equals(resWatermark.read());
-
-               BagState<String> orBag = originalState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-               BagState<String> resBag = restoredState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
-
-               Iterator<String> orIt = orBag.read().iterator();
-               Iterator<String> resIt = resBag.read().iterator();
-
-               while (orIt.hasNext() && resIt.hasNext()) {
-                       comp &= orIt.next().equals(resIt.next());
-               }
-
-               return !((orIt.hasNext() && !resIt.hasNext()) || 
(!orIt.hasNext() && resIt.hasNext())) && comp;
-       }
-
-       private FlinkStateInternals<String> createState(String key) throws 
CannotProvideCoderException {
-               return new FlinkStateInternals<>(
-                       key,
-                       StringUtf8Coder.of(),
-                       IntervalWindow.getCoder(),
-                       OutputTimeFns.outputAtEarliestInputTimestamp());
-       }
-
-       @Test
-       public void test() throws Exception {
-               StateSerializationTest test = new StateSerializationTest();
-               test.initializeStateAndTimers();
+  private static final StateNamespace NAMESPACE_1 = StateNamespaces.global();
+  private static final String KEY_PREFIX = "TEST_";
+
+  // TODO: This can be replaced with the standard Sum.SumIntererFn once the 
state no longer needs
+  // to create a StateTag at the point of restoring state. Currently StateTags 
are compared strictly
+  // by type and combiners always use KeyedCombineFnWithContext rather than 
KeyedCombineFn or CombineFn.
+  private static CombineWithContext.KeyedCombineFnWithContext<Object, Integer, 
int[], Integer> SUM_COMBINER =
+    new CombineWithContext.KeyedCombineFnWithContext<Object, Integer, int[], 
Integer>() {
+      @Override
+      public int[] createAccumulator(Object key, CombineWithContext.Context c) 
{
+        return new int[1];
+      }
+
+      @Override
+      public int[] addInput(Object key, int[] accumulator, Integer value, 
CombineWithContext.Context c) {
+        accumulator[0] += value;
+        return accumulator;
+      }
+
+      @Override
+      public int[] mergeAccumulators(Object key, Iterable<int[]> accumulators, 
CombineWithContext.Context c) {
+        int[] r = new int[1];
+        for (int[] a : accumulators) {
+          r[0] += a[0];
+        }
+        return r;
+      }
+
+      @Override
+      public Integer extractOutput(Object key, int[] accumulator, 
CombineWithContext.Context c) {
+        return accumulator[0];
+      }
+    };
+
+  private static Coder<int[]> INT_ACCUM_CODER = DelegateCoder.of(
+    VarIntCoder.of(),
+    new DelegateCoder.CodingFunction<int[], Integer>() {
+      @Override
+      public Integer apply(int[] accumulator) {
+        return accumulator[0];
+      }
+    },
+    new DelegateCoder.CodingFunction<Integer, int[]>() {
+      @Override
+      public int[] apply(Integer value) {
+        int[] a = new int[1];
+        a[0] = value;
+        return a;
+      }
+    });
+
+  private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR =
+    StateTags.value("stringValue", StringUtf8Coder.of());
+  private static final StateTag<Object, ValueState<Integer>> INT_VALUE_ADDR =
+    StateTags.value("stringValue", VarIntCoder.of());
+  private static final StateTag<Object, AccumulatorCombiningState<Integer, 
int[], Integer>> SUM_INTEGER_ADDR =
+    StateTags.keyedCombiningValueWithContext("sumInteger", INT_ACCUM_CODER, 
SUM_COMBINER);
+  private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
+    StateTags.bag("stringBag", StringUtf8Coder.of());
+  private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> 
WATERMARK_BAG_ADDR =
+    StateTags.watermarkStateInternal("watermark", 
OutputTimeFns.outputAtEarliestInputTimestamp());
+
+  private Map<String, FlinkStateInternals<String>> statePerKey = new 
HashMap<>();
+
+  private Map<String, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
+
+  private void initializeStateAndTimers() throws CannotProvideCoderException {
+    for (int i = 0; i < 10; i++) {
+      String key = KEY_PREFIX + i;
+
+      FlinkStateInternals state = initializeStateForKey(key);
+      Set<TimerInternals.TimerData> timers = new HashSet<>();
+      for (int j = 0; j < 5; j++) {
+        TimerInternals.TimerData timer = TimerInternals
+          .TimerData.of(NAMESPACE_1,
+            new Instant(1000 + i + j), TimeDomain.values()[j % 3]);
+        timers.add(timer);
+      }
+
+      statePerKey.put(key, state);
+      activeTimers.put(key, timers);
+    }
+  }
+
+  private FlinkStateInternals<String> initializeStateForKey(String key) throws 
CannotProvideCoderException {
+    FlinkStateInternals<String> state = createState(key);
+
+    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    value.write("test");
+
+    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+    value2.write(4);
+    value2.write(5);
+
+    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = 
state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    combiningValue.add(1);
+    combiningValue.add(2);
+
+    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
+    watermark.add(new Instant(1000));
+
+    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+    bag.add("v1");
+    bag.add("v2");
+    bag.add("v3");
+    bag.add("v4");
+    return state;
+  }
+
+  private boolean restoreAndTestState(DataInputView in) throws Exception {
+    StateCheckpointReader reader = new StateCheckpointReader(in);
+    final ClassLoader userClassloader = this.getClass().getClassLoader();
+    Coder<? extends BoundedWindow> windowCoder = IntervalWindow.getCoder();
+    Coder<String> keyCoder = StringUtf8Coder.of();
+
+    boolean comparisonRes = true;
+
+    for (String key : statePerKey.keySet()) {
+      comparisonRes &= checkStateForKey(key);
+    }
+
+    // restore the timers
+    Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey = 
StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+    if (activeTimers.size() != restoredTimersPerKey.size()) {
+      return false;
+    }
+
+    for (String key : statePerKey.keySet()) {
+      Set<TimerInternals.TimerData> originalTimers = activeTimers.get(key);
+      Set<TimerInternals.TimerData> restoredTimers = 
restoredTimersPerKey.get(key);
+      comparisonRes &= checkTimersForKey(originalTimers, restoredTimers);
+    }
+
+    // restore the state
+    Map<String, FlinkStateInternals<String>> restoredPerKeyState =
+      StateCheckpointUtils.decodeState(reader, 
OutputTimeFns.outputAtEarliestInputTimestamp(), keyCoder, windowCoder, 
userClassloader);
+    if (restoredPerKeyState.size() != statePerKey.size()) {
+      return false;
+    }
+
+    for (String key : statePerKey.keySet()) {
+      FlinkStateInternals<String> originalState = statePerKey.get(key);
+      FlinkStateInternals<String> restoredState = restoredPerKeyState.get(key);
+      comparisonRes &= checkStateForKey(originalState, restoredState);
+    }
+    return comparisonRes;
+  }
+
+  private boolean checkStateForKey(String key) throws 
CannotProvideCoderException {
+    FlinkStateInternals<String> state = statePerKey.get(key);
+
+    ValueState<String> value = state.state(NAMESPACE_1, STRING_VALUE_ADDR);
+    boolean comp = value.read().equals("test");
+
+    ValueState<Integer> value2 = state.state(NAMESPACE_1, INT_VALUE_ADDR);
+    comp &= value2.read().equals(5);
+
+    AccumulatorCombiningState<Integer, int[], Integer> combiningValue = 
state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    comp &= combiningValue.read().equals(3);
+
+    WatermarkHoldState<BoundedWindow> watermark = state.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
+    comp &= watermark.read().equals(new Instant(1000));
+
+    BagState<String> bag = state.state(NAMESPACE_1, STRING_BAG_ADDR);
+    Iterator<String> it = bag.read().iterator();
+    int i = 0;
+    while (it.hasNext()) {
+      comp &= it.next().equals("v" + (++i));
+    }
+    return comp;
+  }
+
+  private void storeState(AbstractStateBackend.CheckpointStateOutputView out) 
throws Exception {
+    StateCheckpointWriter checkpointBuilder = 
StateCheckpointWriter.create(out);
+    Coder<String> keyCoder = StringUtf8Coder.of();
+
+    // checkpoint the timers
+    StateCheckpointUtils.encodeTimers(activeTimers, checkpointBuilder, 
keyCoder);
+
+    // checkpoint the state
+    StateCheckpointUtils.encodeState(statePerKey, checkpointBuilder, keyCoder);
+  }
+
+  private boolean checkTimersForKey(Set<TimerInternals.TimerData> 
originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
+    boolean comp = true;
+    if (restoredTimers == null) {
+      return false;
+    }
+
+    if (originalTimers.size() != restoredTimers.size()) {
+      return false;
+    }
+
+    for (TimerInternals.TimerData timer : originalTimers) {
+      comp &= restoredTimers.contains(timer);
+    }
+    return comp;
+  }
+
+  private boolean checkStateForKey(FlinkStateInternals<String> originalState, 
FlinkStateInternals<String> restoredState) throws CannotProvideCoderException {
+    if (restoredState == null) {
+      return false;
+    }
+
+    ValueState<String> orValue = originalState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+    ValueState<String> resValue = restoredState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+    boolean comp = orValue.read().equals(resValue.read());
+
+    ValueState<Integer> orIntValue = originalState.state(NAMESPACE_1, 
INT_VALUE_ADDR);
+    ValueState<Integer> resIntValue = restoredState.state(NAMESPACE_1, 
INT_VALUE_ADDR);
+    comp &= orIntValue.read().equals(resIntValue.read());
+
+    AccumulatorCombiningState<Integer, int[], Integer> combOrValue = 
originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    AccumulatorCombiningState<Integer, int[], Integer> combResValue = 
restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+    comp &= combOrValue.read().equals(combResValue.read());
+
+    WatermarkHoldState orWatermark = originalState.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
+    WatermarkHoldState resWatermark = restoredState.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
+    comp &= orWatermark.read().equals(resWatermark.read());
+
+    BagState<String> orBag = originalState.state(NAMESPACE_1, STRING_BAG_ADDR);
+    BagState<String> resBag = restoredState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+
+    Iterator<String> orIt = orBag.read().iterator();
+    Iterator<String> resIt = resBag.read().iterator();
+
+    while (orIt.hasNext() && resIt.hasNext()) {
+      comp &= orIt.next().equals(resIt.next());
+    }
+
+    return !((orIt.hasNext() && !resIt.hasNext()) || (!orIt.hasNext() && 
resIt.hasNext())) && comp;
+  }
+
+  private FlinkStateInternals<String> createState(String key) throws 
CannotProvideCoderException {
+    return new FlinkStateInternals<>(
+      key,
+      StringUtf8Coder.of(),
+      IntervalWindow.getCoder(),
+      OutputTimeFns.outputAtEarliestInputTimestamp());
+  }
+
+  @Test
+  public void test() throws Exception {
+    StateSerializationTest test = new StateSerializationTest();
+    test.initializeStateAndTimers();
 
-               MemoryStateBackend.MemoryCheckpointOutputStream memBackend = 
new MemoryStateBackend.MemoryCheckpointOutputStream(32048);
-               AbstractStateBackend.CheckpointStateOutputView out = new 
AbstractStateBackend.CheckpointStateOutputView(memBackend);
-
-               test.storeState(out);
+    MemoryStateBackend.MemoryCheckpointOutputStream memBackend = new 
MemoryStateBackend.MemoryCheckpointOutputStream(32048);
+    AbstractStateBackend.CheckpointStateOutputView out = new 
AbstractStateBackend.CheckpointStateOutputView(memBackend);
+
+    test.storeState(out);
 
-               byte[] contents = memBackend.closeAndGetBytes();
-               DataInputView in = new DataInputDeserializer(contents, 0, 
contents.length);
+    byte[] contents = memBackend.closeAndGetBytes();
+    DataInputView in = new DataInputDeserializer(contents, 0, contents.length);
 
-               assertEquals(test.restoreAndTestState(in), true);
-       }
+    assertEquals(test.restoreAndTestState(in), true);
+  }
 
 }

Reply via email to