http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
new file mode 100644
index 0000000..b667187
--- /dev/null
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -0,0 +1,507 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import com.dataartisans.flink.dataflow.FlinkTestPipeline;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.*;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.common.base.Throwables;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GroupAlsoByWindowTest {
+
+       private final Combine.CombineFn combiner = new Sum.SumIntegerFn();
+
+       private final WindowingStrategy 
slidingWindowWithAfterWatermarkTriggerStrategy =
+                       
WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5)))
+                                       
.withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+       private final WindowingStrategy sessionWindowingStrategy =
+                       
WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2)))
+                                       
.withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
+                                       
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+                                       
.withAllowedLateness(Duration.standardSeconds(100));
+
+       private final WindowingStrategy fixedWindowingStrategy =
+                       
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10)));
+
+       private final WindowingStrategy fixedWindowWithCountTriggerStrategy =
+                       
fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5));
+
+       private final WindowingStrategy 
fixedWindowWithAfterWatermarkTriggerStrategy =
+                       
fixedWindowingStrategy.withTrigger(AfterWatermark.pastEndOfWindow());
+
+       private final WindowingStrategy fixedWindowWithCompoundTriggerStrategy =
+                       fixedWindowingStrategy.withTrigger(
+                                       Repeatedly.forever(AfterFirst.of(
+                                                       
AfterPane.elementCountAtLeast(5),
+                                                       
AfterWatermark.pastEndOfWindow())));
+
+       /**
+        * The default accumulation mode is
+        * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}.
+        * This strategy changes it to
+        * {@link 
com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES}
+        */
+       private final WindowingStrategy 
fixedWindowWithCompoundTriggerStrategyAcc =
+                       fixedWindowWithCompoundTriggerStrategy
+                                       
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+       @Test
+       public void testWithLateness() throws Exception {
+               WindowingStrategy strategy = 
WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2)))
+                               
.withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES)
+                               .withAllowedLateness(Duration.millis(1000));
+               long initialTime = 0L;
+               Pipeline pipeline = FlinkTestPipeline.create();
+
+               KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+               FlinkGroupAlsoByWindowWrapper gbwOperaror =
+                               FlinkGroupAlsoByWindowWrapper.createForTesting(
+                                               pipeline.getOptions(),
+                                               pipeline.getCoderRegistry(),
+                                               strategy,
+                                               inputCoder,
+                                               combiner.<String>asKeyedFn());
+
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(gbwOperaror);
+               testHarness.open();
+
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processWatermark(new Watermark(initialTime + 2000));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processWatermark(new Watermark(initialTime + 4000));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 4),
+                                               new Instant(initialTime + 1),
+                                               new IntervalWindow(new 
Instant(0), new Instant(2000)),
+                                               PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 2000));
+
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 6),
+                                               new Instant(initialTime + 1999),
+                                               new IntervalWindow(new 
Instant(0), new Instant(2000)),
+                                               PaneInfo.createPane(false, 
true, PaneInfo.Timing.LATE, 1, 1))
+                               , initialTime));
+
+
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 0),
+                                               new Instant(initialTime + 1999),
+                                               new IntervalWindow(new 
Instant(0), new Instant(2000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME, 0, 0))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 4000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testSessionWindows() throws Exception {
+               WindowingStrategy strategy = sessionWindowingStrategy;
+
+               long initialTime = 0L;
+               Pipeline pipeline = FlinkTestPipeline.create();
+
+               KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+               FlinkGroupAlsoByWindowWrapper gbwOperaror =
+                               FlinkGroupAlsoByWindowWrapper.createForTesting(
+                                               pipeline.getOptions(),
+                                               pipeline.getCoderRegistry(),
+                                               strategy,
+                                               inputCoder,
+                                               combiner.<String>asKeyedFn());
+
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(gbwOperaror);
+               testHarness.open();
+
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 3500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 3700), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 2700), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processWatermark(new Watermark(initialTime + 6000));
+
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 6700), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 6800), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 8900), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 7600), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 5600), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+               testHarness.processWatermark(new Watermark(initialTime + 
12000));
+
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 6),
+                                               new Instant(initialTime + 1),
+                                               new IntervalWindow(new 
Instant(1), new Instant(5700)),
+                                               PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 6000));
+
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 11),
+                                               new Instant(initialTime + 6700),
+                                               new IntervalWindow(new 
Instant(1), new Instant(10900)),
+                                               PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 12000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testSlidingWindows() throws Exception {
+               WindowingStrategy strategy = 
slidingWindowWithAfterWatermarkTriggerStrategy;
+               long initialTime = 0L;
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               createTestingOperatorAndState(strategy, 
initialTime);
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               testHarness.processWatermark(new Watermark(initialTime + 
25000));
+
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 6),
+                                               new Instant(initialTime + 5000),
+                                               new IntervalWindow(new 
Instant(0), new Instant(10000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 6),
+                                               new Instant(initialTime + 1),
+                                               new IntervalWindow(new 
Instant(-5000), new Instant(5000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 10000));
+
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 11),
+                                               new Instant(initialTime + 
15000),
+                                               new IntervalWindow(new 
Instant(10000), new Instant(20000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 3),
+                                               new Instant(initialTime + 
10000),
+                                               new IntervalWindow(new 
Instant(5000), new Instant(15000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key2", 1),
+                                               new Instant(initialTime + 
19500),
+                                               new IntervalWindow(new 
Instant(10000), new Instant(20000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 20000));
+
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key2", 1),
+                                               new Instant(initialTime + 
20000),
+                                               /**
+                                                * this is 20000 and not 19500 
because of a convention in dataflow where
+                                                * timestamps of windowed 
values in a window cannot be smaller than the
+                                                * end of a previous window. 
Checkout the documentation of the
+                                                * {@link 
WindowFn#getOutputTime(Instant, BoundedWindow)}
+                                                */
+                                               new IntervalWindow(new 
Instant(15000), new Instant(25000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new StreamRecord<>(
+                               WindowedValue.of(KV.of("key1", 8),
+                                               new Instant(initialTime + 
20000),
+                                               new IntervalWindow(new 
Instant(15000), new Instant(25000)),
+                                               PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
+                               , initialTime));
+               expectedOutput.add(new Watermark(initialTime + 25000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testAfterWatermarkProgram() throws Exception {
+               WindowingStrategy strategy = 
fixedWindowWithAfterWatermarkTriggerStrategy;
+               long initialTime = 0L;
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               createTestingOperatorAndState(strategy, 
initialTime);
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+               expectedOutput.add(new Watermark(initialTime + 10000));
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+               expectedOutput.add(new Watermark(initialTime + 20000));
+
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+               testHarness.close();
+       }
+
+       @Test
+       public void testAfterCountProgram() throws Exception {
+               WindowingStrategy strategy = 
fixedWindowWithCountTriggerStrategy;
+
+               long initialTime = 0L;
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               createTestingOperatorAndState(strategy, 
initialTime);
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+               expectedOutput.add(new Watermark(initialTime + 10000));
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.LATE, 0, 0)), initialTime));
+               expectedOutput.add(new Watermark(initialTime + 20000));
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testCompoundProgram() throws Exception {
+               WindowingStrategy strategy = 
fixedWindowWithCompoundTriggerStrategy;
+
+               long initialTime = 0L;
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               createTestingOperatorAndState(strategy, 
initialTime);
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               /**
+                * PaneInfo are:
+                *              isFirst (pane in window),
+                *              isLast, Timing (of triggering),
+                *              index (of pane in the window),
+                *              onTimeIndex (if it the 1st,2nd, ... pane that 
was fired on time)
+                * */
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
+                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+
+               expectedOutput.add(new Watermark(initialTime + 10000));
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+
+               expectedOutput.add(new Watermark(initialTime + 20000));
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+       }
+
+       @Test
+       public void testCompoundAccumulatingPanesProgram() throws Exception {
+               WindowingStrategy strategy = 
fixedWindowWithCompoundTriggerStrategyAcc;
+               long initialTime = 0L;
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               createTestingOperatorAndState(strategy, 
initialTime);
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
+                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+
+               expectedOutput.add(new Watermark(initialTime + 10000));
+
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+               expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+
+               expectedOutput.add(new Watermark(initialTime + 20000));
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
+
+               testHarness.close();
+       }
+
+       private OneInputStreamOperatorTestHarness 
createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) 
throws Exception {
+               Pipeline pipeline = FlinkTestPipeline.create();
+
+               KvCoder<String, Integer> inputCoder = 
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
+
+               FlinkGroupAlsoByWindowWrapper gbwOperaror =
+                               FlinkGroupAlsoByWindowWrapper.createForTesting(
+                                               pipeline.getOptions(),
+                                               pipeline.getCoderRegistry(),
+                                               strategy,
+                                               inputCoder,
+                                               combiner.<String>asKeyedFn());
+
+               OneInputStreamOperatorTestHarness<WindowedValue<KV<String, 
Integer>>, WindowedValue<KV<String, Integer>>> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(gbwOperaror);
+               testHarness.open();
+
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1000), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 1200), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 10000), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 12100), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 14200), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 15300), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 16500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+               testHarness.processElement(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), new 
Instant(initialTime + 19500), null, PaneInfo.NO_FIRING), initialTime + 20));
+
+               testHarness.processWatermark(new Watermark(initialTime + 
10000));
+               testHarness.processWatermark(new Watermark(initialTime + 
20000));
+
+               return testHarness;
+       }
+
+       private static class ResultSortComparator implements Comparator<Object> 
{
+               @Override
+               public int compare(Object o1, Object o2) {
+                       if (o1 instanceof Watermark && o2 instanceof Watermark) 
{
+                               Watermark w1 = (Watermark) o1;
+                               Watermark w2 = (Watermark) o2;
+                               return (int) (w1.getTimestamp() - 
w2.getTimestamp());
+                       } else {
+                               StreamRecord<WindowedValue<KV<String, 
Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
+                               StreamRecord<WindowedValue<KV<String, 
Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
+
+                               if (sr0.getTimestamp() != sr1.getTimestamp()) {
+                                       return (int) (sr0.getTimestamp() - 
sr1.getTimestamp());
+                               }
+
+                               int comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+                               if(comparison == 0) {
+                                       comparison = Integer.compare(
+                                                       
sr0.getValue().getValue().getValue(),
+                                                       
sr1.getValue().getValue().getValue());
+                               }
+                               if(comparison == 0) {
+                                       Collection windowsA = 
sr0.getValue().getWindows();
+                                       Collection windowsB = 
sr1.getValue().getWindows();
+
+                                       if(windowsA.size() != 1 || 
windowsB.size() != 1) {
+                                               throw new 
IllegalStateException("A value cannot belong to more than one windows after 
grouping.");
+                                       }
+
+                                       BoundedWindow windowA = (BoundedWindow) 
windowsA.iterator().next();
+                                       BoundedWindow windowB = (BoundedWindow) 
windowsB.iterator().next();
+                                       comparison = 
Long.compare(windowA.maxTimestamp().getMillis(), 
windowB.maxTimestamp().getMillis());
+                               }
+                               return comparison;
+                       }
+               }
+       }
+
+       private <T> WindowedValue<T> makeWindowedValue(WindowingStrategy 
strategy,
+                                                                               
                   T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
+               final Instant inputTimestamp = timestamp;
+               final WindowFn windowFn = strategy.getWindowFn();
+
+               if (timestamp == null) {
+                       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+               }
+
+               if (windows == null) {
+                       try {
+                               windows = windowFn.assignWindows(windowFn.new 
AssignContext() {
+                                       @Override
+                                       public Object element() {
+                                               throw new 
UnsupportedOperationException(
+                                                               "WindowFn 
attempted to access input element when none was available"); // TODO: 12/16/15 
aljoscha's comment in slack
+                                       }
+
+                                       @Override
+                                       public Instant timestamp() {
+                                               if (inputTimestamp == null) {
+                                                       throw new 
UnsupportedOperationException(
+                                                                       
"WindowFn attempted to access input timestamp when none was available");
+                                               }
+                                               return inputTimestamp;
+                                       }
+
+                                       @Override
+                                       public Collection<? extends 
BoundedWindow> windows() {
+                                               throw new 
UnsupportedOperationException(
+                                                               "WindowFn 
attempted to access input windows when none were available");
+                                       }
+                               });
+                       } catch (Exception e) {
+                               Throwables.propagateIfInstanceOf(e, 
UserCodeException.class);
+                               throw new UserCodeException(e);
+                       }
+               }
+
+               return WindowedValue.of(output, timestamp, windows, pane);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
new file mode 100644
index 0000000..084ada2
--- /dev/null
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/StateSerializationTest.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.streaming;
+
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.FlinkStateInternals;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.StateCheckpointReader;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.StateCheckpointUtils;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.state.StateCheckpointWriter;
+import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.state.*;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+public class StateSerializationTest {
+
+       private static final StateNamespace NAMESPACE_1 = 
StateNamespaces.global();
+       private static final String KEY_PREFIX = "TEST_";
+
+       private static final StateTag<ValueState<String>> STRING_VALUE_ADDR =
+                       StateTags.value("stringValue", StringUtf8Coder.of());
+       private static final StateTag<ValueState<Integer>> INT_VALUE_ADDR =
+                       StateTags.value("stringValue", VarIntCoder.of());
+       private static final StateTag<CombiningValueState<Integer, Integer>> 
SUM_INTEGER_ADDR =
+                       StateTags.combiningValueFromInputInternal(
+                                       "sumInteger", VarIntCoder.of(), new 
Sum.SumIntegerFn());
+       private static final StateTag<BagState<String>> STRING_BAG_ADDR =
+                       StateTags.bag("stringBag", StringUtf8Coder.of());
+       private static final StateTag<WatermarkStateInternal> 
WATERMARK_BAG_ADDR =
+                       StateTags.watermarkStateInternal("watermark");
+
+       private Combine.CombineFn combiner = new Sum.SumIntegerFn();
+
+       private Map<String, FlinkStateInternals<String>> statePerKey = new 
HashMap<>();
+
+       private Map<String, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
+
+       private void initializeStateAndTimers() throws 
CannotProvideCoderException {
+               for (int i = 0; i < 10; i++) {
+                       String key = KEY_PREFIX + i;
+
+                       FlinkStateInternals state = initializeStateForKey(key);
+                       Set<TimerInternals.TimerData> timers = new HashSet<>();
+                       for (int j = 0; j < 5; j++) {
+                               TimerInternals.TimerData timer = TimerInternals
+                                               .TimerData.of(NAMESPACE_1,
+                                                               new 
Instant(1000 + i + j), TimeDomain.values()[j % 3]);
+                               timers.add(timer);
+                       }
+
+                       statePerKey.put(key, state);
+                       activeTimers.put(key, timers);
+               }
+       }
+
+       private FlinkStateInternals<String> initializeStateForKey(String key) 
throws CannotProvideCoderException {
+               FlinkStateInternals<String> state = createState(key);
+
+               ValueState<String> value = state.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+               value.set("test");
+
+               ValueState<Integer> value2 = state.state(NAMESPACE_1, 
INT_VALUE_ADDR);
+               value2.set(4);
+               value2.set(5);
+
+               CombiningValueState<Integer, Integer> combiningValue = 
state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+               combiningValue.add(1);
+               combiningValue.add(2);
+
+               WatermarkStateInternal watermark = state.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
+               watermark.add(new Instant(1000));
+
+               BagState<String> bag = state.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+               bag.add("v1");
+               bag.add("v2");
+               bag.add("v3");
+               bag.add("v4");
+               return state;
+       }
+
+       private boolean restoreAndTestState(DataInputView in) throws Exception {
+               StateCheckpointReader reader = new StateCheckpointReader(in);
+               final ClassLoader userClassloader = 
this.getClass().getClassLoader();
+               Coder<? extends BoundedWindow> windowCoder = 
IntervalWindow.getCoder();
+               Coder<String> keyCoder = StringUtf8Coder.of();
+
+               boolean comparisonRes = true;
+
+               for(String key: statePerKey.keySet()) {
+                       comparisonRes &= checkStateForKey(key);
+               }
+
+               // restore the timers
+               Map<String, Set<TimerInternals.TimerData>> restoredTimersPerKey 
= StateCheckpointUtils.decodeTimers(reader, windowCoder, keyCoder);
+               if(activeTimers.size() != restoredTimersPerKey.size()) {
+                       return false;
+               }
+
+               for(String key: statePerKey.keySet()) {
+                       Set<TimerInternals.TimerData> originalTimers = 
activeTimers.get(key);
+                       Set<TimerInternals.TimerData> restoredTimers = 
restoredTimersPerKey.get(key);
+                       comparisonRes &= checkTimersForKey(originalTimers, 
restoredTimers);
+               }
+
+               // restore the state
+               Map<String, FlinkStateInternals<String>> restoredPerKeyState = 
StateCheckpointUtils.decodeState(reader, combiner.asKeyedFn(), keyCoder, 
windowCoder, userClassloader);
+               if(restoredPerKeyState.size() != statePerKey.size()) {
+                       return false;
+               }
+
+               for(String key: statePerKey.keySet()) {
+                       FlinkStateInternals<String> originalState = 
statePerKey.get(key);
+                       FlinkStateInternals<String> restoredState = 
restoredPerKeyState.get(key);
+                       comparisonRes &= checkStateForKey(originalState, 
restoredState);
+               }
+               return comparisonRes;
+       }
+
+       private boolean checkStateForKey(String key) throws 
CannotProvideCoderException {
+               FlinkStateInternals<String> state = statePerKey.get(key);
+
+               ValueState<String> value = state.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+               boolean comp = value.get().read().equals("test");
+
+               ValueState<Integer> value2 = state.state(NAMESPACE_1, 
INT_VALUE_ADDR);
+               comp &= value2.get().read().equals(5);
+
+               CombiningValueState<Integer, Integer> combiningValue = 
state.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+               comp &= combiningValue.get().read().equals(3);
+
+               WatermarkStateInternal watermark = state.state(NAMESPACE_1, 
WATERMARK_BAG_ADDR);
+               comp &= watermark.get().read().equals(new Instant(1000));
+
+               BagState<String> bag = state.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+               Iterator<String> it = bag.get().read().iterator();
+               int i = 0;
+               while(it.hasNext()) {
+                       comp &= it.next().equals("v"+ (++i));
+               }
+               return comp;
+       }
+
+       private void storeState(StateBackend.CheckpointStateOutputView out) 
throws Exception {
+               StateCheckpointWriter checkpointBuilder = 
StateCheckpointWriter.create(out);
+               Coder<String> keyCoder = StringUtf8Coder.of();
+
+               // checkpoint the timers
+               StateCheckpointUtils.encodeTimers(activeTimers, 
checkpointBuilder,keyCoder);
+
+               // checkpoint the state
+               StateCheckpointUtils.encodeState(statePerKey, 
checkpointBuilder, keyCoder);
+       }
+
+       private boolean checkTimersForKey(Set<TimerInternals.TimerData> 
originalTimers, Set<TimerInternals.TimerData> restoredTimers) {
+               boolean comp = true;
+               if(restoredTimers == null) {
+                       return false;
+               }
+
+               if(originalTimers.size() != restoredTimers.size()) {
+                       return false;
+               }
+
+               for(TimerInternals.TimerData timer: originalTimers) {
+                       comp &= restoredTimers.contains(timer);
+               }
+               return comp;
+       }
+
+       private boolean checkStateForKey(FlinkStateInternals<String> 
originalState, FlinkStateInternals<String> restoredState) throws 
CannotProvideCoderException {
+               if(restoredState == null) {
+                       return false;
+               }
+
+               ValueState<String> orValue = originalState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+               ValueState<String> resValue = restoredState.state(NAMESPACE_1, 
STRING_VALUE_ADDR);
+               boolean comp = 
orValue.get().read().equals(resValue.get().read());
+
+               ValueState<Integer> orIntValue = 
originalState.state(NAMESPACE_1, INT_VALUE_ADDR);
+               ValueState<Integer> resIntValue = 
restoredState.state(NAMESPACE_1, INT_VALUE_ADDR);
+               comp &= 
orIntValue.get().read().equals(resIntValue.get().read());
+
+               CombiningValueState<Integer, Integer> combOrValue = 
originalState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+               CombiningValueState<Integer, Integer> combResValue = 
restoredState.state(NAMESPACE_1, SUM_INTEGER_ADDR);
+               comp &= 
combOrValue.get().read().equals(combResValue.get().read());
+
+               WatermarkStateInternal orWatermark = 
originalState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+               WatermarkStateInternal resWatermark = 
restoredState.state(NAMESPACE_1, WATERMARK_BAG_ADDR);
+               comp &= 
orWatermark.get().read().equals(resWatermark.get().read());
+
+               BagState<String> orBag = originalState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+               BagState<String> resBag = restoredState.state(NAMESPACE_1, 
STRING_BAG_ADDR);
+
+               Iterator<String> orIt = orBag.get().read().iterator();
+               Iterator<String> resIt = resBag.get().read().iterator();
+
+               while (orIt.hasNext() && resIt.hasNext()) {
+                       comp &= orIt.next().equals(resIt.next());
+               }
+
+               return !((orIt.hasNext() && !resIt.hasNext()) || 
(!orIt.hasNext() && resIt.hasNext())) && comp;
+       }
+
+       private FlinkStateInternals<String> createState(String key) throws 
CannotProvideCoderException {
+               return new FlinkStateInternals<>(
+                               key,
+                               StringUtf8Coder.of(),
+                               IntervalWindow.getCoder(),
+                               combiner.asKeyedFn());
+       }
+
+       @Test
+       public void test() throws Exception {
+               StateSerializationTest test = new StateSerializationTest();
+               test.initializeStateAndTimers();
+
+               MemoryStateBackend.MemoryCheckpointOutputStream memBackend = 
new MemoryStateBackend.MemoryCheckpointOutputStream(25728);
+               StateBackend.CheckpointStateOutputView out = new 
StateBackend.CheckpointStateOutputView(memBackend);
+
+               test.storeState(out);
+
+               byte[] contents = memBackend.closeAndGetBytes();
+               ByteArrayInputView in = new ByteArrayInputView(contents);
+
+               assertEquals(test.restoreAndTestState(in), true);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
index cbf5d77..74f754b 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/util/JoinExamples.java
@@ -76,8 +76,8 @@ public class JoinExamples {
                                                KV<String, CoGbkResult> e = 
c.element();
                                                CoGbkResult val = e.getValue();
                                                String countryCode = e.getKey();
-                                               String countryName;
-                                               countryName = 
e.getValue().getOnly(countryInfoTag);
+                                               String countryName = "none";
+                                               countryName = 
e.getValue().getOnly(countryInfoTag, "Kostas");
                                                for (String eventInfo : 
c.element().getValue().getAll(eventInfoTag)) {
                                                        // Generate a string 
that combines information from both collection values
                                                        
c.output(KV.of(countryCode, "Country name: " + countryName


Reply via email to