http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index eabc307..7dae0b0 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -44,221 +44,221 @@ import java.util.Collection;
  * */
 public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends 
RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> {
 
-       private final DoFn<IN, OUTDF> doFn;
-       private final WindowingStrategy<?, ?> windowingStrategy;
-       private transient PipelineOptions options;
-
-       private DoFnProcessContext context;
-
-       public FlinkAbstractParDoWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
-               Preconditions.checkNotNull(options);
-               Preconditions.checkNotNull(windowingStrategy);
-               Preconditions.checkNotNull(doFn);
-
-               this.doFn = doFn;
-               this.options = options;
-               this.windowingStrategy = windowingStrategy;
-       }
-
-       private void initContext(DoFn<IN, OUTDF> function, 
Collector<WindowedValue<OUTFL>> outCollector) {
-               if (this.context == null) {
-                       this.context = new DoFnProcessContext(function, 
outCollector);
-               }
-       }
-
-       @Override
-       public void flatMap(WindowedValue<IN> value, 
Collector<WindowedValue<OUTFL>> out) throws Exception {
-               this.initContext(doFn, out);
-
-               // for each window the element belongs to, create a new copy 
here.
-               Collection<? extends BoundedWindow> windows = 
value.getWindows();
-               if (windows.size() <= 1) {
-                       processElement(value);
-               } else {
-                       for (BoundedWindow window : windows) {
-                               processElement(WindowedValue.of(
-                                               value.getValue(), 
value.getTimestamp(), window, value.getPane()));
-                       }
-               }
-       }
-
-       private void processElement(WindowedValue<IN> value) throws Exception {
-               this.context.setElement(value);
-               this.doFn.startBundle(context);
-               doFn.processElement(context);
-               this.doFn.finishBundle(context);
-       }
-
-       private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext 
{
-
-               private final DoFn<IN, OUTDF> fn;
-
-               protected final Collector<WindowedValue<OUTFL>> collector;
-
-               private WindowedValue<IN> element;
-
-               private DoFnProcessContext(DoFn<IN, OUTDF> function, 
Collector<WindowedValue<OUTFL>> outCollector) {
-                       function.super();
-                       super.setupDelegateAggregators();
-
-                       this.fn = function;
-                       this.collector = outCollector;
-               }
-
-               public void setElement(WindowedValue<IN> value) {
-                       this.element = value;
-               }
-
-               @Override
-               public IN element() {
-                       return this.element.getValue();
-               }
-
-               @Override
-               public Instant timestamp() {
-                       return this.element.getTimestamp();
-               }
-
-               @Override
-               public BoundedWindow window() {
-                       if (!(fn instanceof DoFn.RequiresWindowAccess)) {
-                               throw new UnsupportedOperationException(
-                                               "window() is only available in 
the context of a DoFn marked as RequiresWindow.");
-                       }
-
-                       Collection<? extends BoundedWindow> windows = 
this.element.getWindows();
-                       if (windows.size() != 1) {
-                               throw new IllegalArgumentException("Each 
element is expected to belong to 1 window. " +
-                                               "This belongs to " + 
windows.size() + ".");
-                       }
-                       return windows.iterator().next();
-               }
-
-               @Override
-               public PaneInfo pane() {
-                       return this.element.getPane();
-               }
-
-               @Override
-               public WindowingInternals<IN, OUTDF> windowingInternals() {
-                       return windowingInternalsHelper(element, collector);
-               }
-
-               @Override
-               public PipelineOptions getPipelineOptions() {
-                       return options;
-               }
-
-               @Override
-               public <T> T sideInput(PCollectionView<T> view) {
-                       throw new RuntimeException("sideInput() is not 
supported in Streaming mode.");
-               }
-
-               @Override
-               public void output(OUTDF output) {
-                       outputWithTimestamp(output, 
this.element.getTimestamp());
-               }
-
-               @Override
-               public void outputWithTimestamp(OUTDF output, Instant 
timestamp) {
-                       outputWithTimestampHelper(element, output, timestamp, 
collector);
-               }
-
-               @Override
-               public <T> void sideOutput(TupleTag<T> tag, T output) {
-                       sideOutputWithTimestamp(tag, output, 
this.element.getTimestamp());
-               }
-
-               @Override
-               public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T 
output, Instant timestamp) {
-                       sideOutputWithTimestampHelper(element, output, 
timestamp, collector, tag);
-               }
-
-               @Override
-               protected <AggInputT, AggOutputT> Aggregator<AggInputT, 
AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, 
?, AggOutputT> combiner) {
-                       Accumulator acc = 
getRuntimeContext().getAccumulator(name);
-                       if (acc != null) {
-                               AccumulatorHelper.compareAccumulatorTypes(name,
-                                               
SerializableFnAggregatorWrapper.class, acc.getClass());
-                               return (Aggregator<AggInputT, AggOutputT>) acc;
-                       }
-
-                       SerializableFnAggregatorWrapper<AggInputT, AggOutputT> 
accumulator =
-                                       new 
SerializableFnAggregatorWrapper<>(combiner);
-                       getRuntimeContext().addAccumulator(name, accumulator);
-                       return accumulator;
-               }
-       }
-
-       protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) 
{
-               if 
(timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
-                       throw new IllegalArgumentException(String.format(
-                                       "Cannot output with timestamp %s. 
Output timestamps must be no earlier than the "
-                                                       + "timestamp of the 
current input (%s) minus the allowed skew (%s). See the "
-                                                       + 
"DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed 
skew.",
-                                       timestamp, ref.getTimestamp(),
-                                       
PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
-               }
-       }
-
-       protected <T> WindowedValue<T> makeWindowedValue(
-                       T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
-               final Instant inputTimestamp = timestamp;
-               final WindowFn windowFn = windowingStrategy.getWindowFn();
-
-               if (timestamp == null) {
-                       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
-               }
-
-               if (windows == null) {
-                       try {
-                               windows = windowFn.assignWindows(windowFn.new 
AssignContext() {
-                                       @Override
-                                       public Object element() {
-                                               throw new 
UnsupportedOperationException(
-                                                               "WindowFn 
attempted to access input element when none was available");
-                                       }
-
-                                       @Override
-                                       public Instant timestamp() {
-                                               if (inputTimestamp == null) {
-                                                       throw new 
UnsupportedOperationException(
-                                                                       
"WindowFn attempted to access input timestamp when none was available");
-                                               }
-                                               return inputTimestamp;
-                                       }
-
-                                       @Override
-                                       public Collection<? extends 
BoundedWindow> windows() {
-                                               throw new 
UnsupportedOperationException(
-                                                               "WindowFn 
attempted to access input windows when none were available");
-                                       }
-                               });
-                       } catch (Exception e) {
-                               throw UserCodeException.wrap(e);
-                       }
-               }
-
-               return WindowedValue.of(output, timestamp, windows, pane);
-       }
-
-       ///////////                     ABSTRACT METHODS TO BE IMPLEMENTED BY 
SUBCLASSES                        /////////////////
-
-       public abstract void outputWithTimestampHelper(
-                       WindowedValue<IN> inElement,
-                       OUTDF output,
-                       Instant timestamp,
-                       Collector<WindowedValue<OUTFL>> outCollector);
-
-       public abstract <T> void sideOutputWithTimestampHelper(
-                       WindowedValue<IN> inElement,
-                       T output,
-                       Instant timestamp,
-                       Collector<WindowedValue<OUTFL>> outCollector,
-                       TupleTag<T> tag);
-
-       public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
-                       WindowedValue<IN> inElement,
-                       Collector<WindowedValue<OUTFL>> outCollector);
+  private final DoFn<IN, OUTDF> doFn;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private transient PipelineOptions options;
+
+  private DoFnProcessContext context;
+
+  public FlinkAbstractParDoWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUTDF> doFn) {
+    Preconditions.checkNotNull(options);
+    Preconditions.checkNotNull(windowingStrategy);
+    Preconditions.checkNotNull(doFn);
+
+    this.doFn = doFn;
+    this.options = options;
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  private void initContext(DoFn<IN, OUTDF> function, 
Collector<WindowedValue<OUTFL>> outCollector) {
+    if (this.context == null) {
+      this.context = new DoFnProcessContext(function, outCollector);
+    }
+  }
+
+  @Override
+  public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> 
out) throws Exception {
+    this.initContext(doFn, out);
+
+    // for each window the element belongs to, create a new copy here.
+    Collection<? extends BoundedWindow> windows = value.getWindows();
+    if (windows.size() <= 1) {
+      processElement(value);
+    } else {
+      for (BoundedWindow window : windows) {
+        processElement(WindowedValue.of(
+            value.getValue(), value.getTimestamp(), window, value.getPane()));
+      }
+    }
+  }
+
+  private void processElement(WindowedValue<IN> value) throws Exception {
+    this.context.setElement(value);
+    this.doFn.startBundle(context);
+    doFn.processElement(context);
+    this.doFn.finishBundle(context);
+  }
+
+  private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
+
+    private final DoFn<IN, OUTDF> fn;
+
+    protected final Collector<WindowedValue<OUTFL>> collector;
+
+    private WindowedValue<IN> element;
+
+    private DoFnProcessContext(DoFn<IN, OUTDF> function, 
Collector<WindowedValue<OUTFL>> outCollector) {
+      function.super();
+      super.setupDelegateAggregators();
+
+      this.fn = function;
+      this.collector = outCollector;
+    }
+
+    public void setElement(WindowedValue<IN> value) {
+      this.element = value;
+    }
+
+    @Override
+    public IN element() {
+      return this.element.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return this.element.getTimestamp();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      if (!(fn instanceof DoFn.RequiresWindowAccess)) {
+        throw new UnsupportedOperationException(
+            "window() is only available in the context of a DoFn marked as 
RequiresWindow.");
+      }
+
+      Collection<? extends BoundedWindow> windows = this.element.getWindows();
+      if (windows.size() != 1) {
+        throw new IllegalArgumentException("Each element is expected to belong 
to 1 window. " +
+            "This belongs to " + windows.size() + ".");
+      }
+      return windows.iterator().next();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return this.element.getPane();
+    }
+
+    @Override
+    public WindowingInternals<IN, OUTDF> windowingInternals() {
+      return windowingInternalsHelper(element, collector);
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming 
mode.");
+    }
+
+    @Override
+    public void output(OUTDF output) {
+      outputWithTimestamp(output, this.element.getTimestamp());
+    }
+
+    @Override
+    public void outputWithTimestamp(OUTDF output, Instant timestamp) {
+      outputWithTimestampHelper(element, output, timestamp, collector);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      sideOutputWithTimestamp(tag, output, this.element.getTimestamp());
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      sideOutputWithTimestampHelper(element, output, timestamp, collector, 
tag);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
+      Accumulator acc = getRuntimeContext().getAccumulator(name);
+      if (acc != null) {
+        AccumulatorHelper.compareAccumulatorTypes(name,
+            SerializableFnAggregatorWrapper.class, acc.getClass());
+        return (Aggregator<AggInputT, AggOutputT>) acc;
+      }
+
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+          new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, accumulator);
+      return accumulator;
+    }
+  }
+
+  protected void checkTimestamp(WindowedValue<IN> ref, Instant timestamp) {
+    if 
(timestamp.isBefore(ref.getTimestamp().minus(doFn.getAllowedTimestampSkew()))) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
+              + "timestamp of the current input (%s) minus the allowed skew 
(%s). See the "
+              + "DoFn#getAllowedTimestmapSkew() Javadoc for details on 
changing the allowed skew.",
+          timestamp, ref.getTimestamp(),
+          
PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod())));
+    }
+  }
+
+  protected <T> WindowedValue<T> makeWindowedValue(
+      T output, Instant timestamp, Collection<? extends BoundedWindow> 
windows, PaneInfo pane) {
+    final Instant inputTimestamp = timestamp;
+    final WindowFn windowFn = windowingStrategy.getWindowFn();
+
+    if (timestamp == null) {
+      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    if (windows == null) {
+      try {
+        windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public Object element() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input element when none was 
available");
+          }
+
+          @Override
+          public Instant timestamp() {
+            if (inputTimestamp == null) {
+              throw new UnsupportedOperationException(
+                  "WindowFn attempted to access input timestamp when none was 
available");
+            }
+            return inputTimestamp;
+          }
+
+          @Override
+          public Collection<? extends BoundedWindow> windows() {
+            throw new UnsupportedOperationException(
+                "WindowFn attempted to access input windows when none were 
available");
+          }
+        });
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    return WindowedValue.of(output, timestamp, windows, pane);
+  }
+
+  ///////////      ABSTRACT METHODS TO BE IMPLEMENTED BY SUBCLASSES      
/////////////////
+
+  public abstract void outputWithTimestampHelper(
+      WindowedValue<IN> inElement,
+      OUTDF output,
+      Instant timestamp,
+      Collector<WindowedValue<OUTFL>> outCollector);
+
+  public abstract <T> void sideOutputWithTimestampHelper(
+      WindowedValue<IN> inElement,
+      T output,
+      Instant timestamp,
+      Collector<WindowedValue<OUTFL>> outCollector,
+      TupleTag<T> tag);
+
+  public abstract WindowingInternals<IN, OUTDF> windowingInternalsHelper(
+      WindowedValue<IN> inElement,
+      Collector<WindowedValue<OUTFL>> outCollector);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index fb3d329..55235c9 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -68,562 +68,562 @@ import java.util.*;
  * for furhter processing.
  */
 public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
-               extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
-               implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, 
WindowedValue<KV<K, VOUT>>> {
-
-       private static final long serialVersionUID = 1L;
-
-       private transient PipelineOptions options;
-
-       private transient CoderRegistry coderRegistry;
-
-       private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
-
-       private ProcessContext context;
-
-       private final WindowingStrategy<KV<K, VIN>, BoundedWindow> 
windowingStrategy;
-
-       private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
-
-       private final KvCoder<K, VIN> inputKvCoder;
-
-       /**
-        * State is kept <b>per-key</b>. This data structure keeps this mapping 
between an active key, i.e. a
-        * key whose elements are currently waiting to be processed, and its 
associated state.
-        */
-       private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new 
HashMap<>();
-
-       /**
-        * Timers waiting to be processed.
-        */
-       private Map<K, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
-
-       private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
-
-       /**
-        * Creates an DataStream where elements are grouped in windows based on 
the specified windowing strategy.
-        * This method assumes that <b>elements are already grouped by key</b>.
-        * <p/>
-        * The difference with {@link #createForIterable(PipelineOptions, 
PCollection, KeyedStream)}
-        * is that this method assumes that a combiner function is provided
-        * (see {@link 
com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
-        * A combiner helps at increasing the speed and, in most of the cases, 
reduce the per-window state.
-        *
-        * @param options            the general job configuration options.
-        * @param input              the input Dataflow {@link 
com.google.cloud.dataflow.sdk.values.PCollection}.
-        * @param groupedStreamByKey the input stream, it is assumed to already 
be grouped by key.
-        * @param combiner           the combiner to be used.
-        * @param outputKvCoder      the type of the output values.
-        */
-       public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, 
VOUT>>> create(
-                       PipelineOptions options,
-                       PCollection input,
-                       KeyedStream<WindowedValue<KV<K, VIN>>, K> 
groupedStreamByKey,
-                       Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
-                       KvCoder<K, VOUT> outputKvCoder) {
-               Preconditions.checkNotNull(options);
-
-               KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) 
input.getCoder();
-               FlinkGroupAlsoByWindowWrapper windower = new 
FlinkGroupAlsoByWindowWrapper<>(options,
-                               input.getPipeline().getCoderRegistry(), 
input.getWindowingStrategy(), inputKvCoder, combiner);
-
-               Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = 
WindowedValue.FullWindowedValueCoder.of(
-                               outputKvCoder,
-                               
input.getWindowingStrategy().getWindowFn().windowCoder());
-
-               CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo 
=
-                               new 
CoderTypeInformation<>(windowedOutputElemCoder);
-
-               DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = 
groupedStreamByKey
-                               .transform("GroupByWindowWithCombiner",
-                                               new 
CoderTypeInformation<>(outputKvCoder),
-                                               windower)
-                               .returns(outputTypeInfo);
-
-               return groupedByKeyAndWindow;
-       }
-
-       /**
-        * Creates an DataStream where elements are grouped in windows based on 
the specified windowing strategy.
-        * This method assumes that <b>elements are already grouped by key</b>.
-        * <p/>
-        * The difference with {@link #create(PipelineOptions, PCollection, 
KeyedStream, Combine.KeyedCombineFn, KvCoder)}
-        * is that this method assumes no combiner function
-        * (see {@link 
com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
-        *
-        * @param options            the general job configuration options.
-        * @param input              the input Dataflow {@link 
com.google.cloud.dataflow.sdk.values.PCollection}.
-        * @param groupedStreamByKey the input stream, it is assumed to already 
be grouped by key.
-        */
-       public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> 
createForIterable(
-                       PipelineOptions options,
-                       PCollection input,
-                       KeyedStream<WindowedValue<KV<K, VIN>>, K> 
groupedStreamByKey) {
-               Preconditions.checkNotNull(options);
-
-               KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) 
input.getCoder();
-               Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-               Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
-
-               FlinkGroupAlsoByWindowWrapper windower = new 
FlinkGroupAlsoByWindowWrapper(options,
-                               input.getPipeline().getCoderRegistry(), 
input.getWindowingStrategy(), inputKvCoder, null);
-
-               Coder<Iterable<VIN>> valueIterCoder = 
IterableCoder.of(inputValueCoder);
-               KvCoder<K, Iterable<VIN>> outputElemCoder = 
KvCoder.of(keyCoder, valueIterCoder);
-
-               Coder<WindowedValue<KV<K, Iterable<VIN>>>> 
windowedOutputElemCoder = WindowedValue.FullWindowedValueCoder.of(
-                               outputElemCoder,
-                               
input.getWindowingStrategy().getWindowFn().windowCoder());
-
-               CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> 
outputTypeInfo =
-                               new 
CoderTypeInformation<>(windowedOutputElemCoder);
-
-               DataStream<WindowedValue<KV<K, Iterable<VIN>>>> 
groupedByKeyAndWindow = groupedStreamByKey
-                               .transform("GroupByWindow",
-                                               new 
CoderTypeInformation<>(windowedOutputElemCoder),
-                                               windower)
-                               .returns(outputTypeInfo);
-
-               return groupedByKeyAndWindow;
-       }
-
-       public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
-       createForTesting(PipelineOptions options,
-                        CoderRegistry registry,
-                        WindowingStrategy<KV<K, VIN>, BoundedWindow> 
windowingStrategy,
-                        KvCoder<K, VIN> inputCoder,
-                        Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
-               Preconditions.checkNotNull(options);
-
-               return new FlinkGroupAlsoByWindowWrapper(options, registry, 
windowingStrategy, inputCoder, combiner);
-       }
-
-       private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
-                                             CoderRegistry registry,
-                                             WindowingStrategy<KV<K, VIN>, 
BoundedWindow> windowingStrategy,
-                                             KvCoder<K, VIN> inputCoder,
-                                             Combine.KeyedCombineFn<K, VIN, 
VACC, VOUT> combiner) {
-               Preconditions.checkNotNull(options);
-
-               this.options = Preconditions.checkNotNull(options);
-               this.coderRegistry = Preconditions.checkNotNull(registry);
-               this.inputKvCoder = 
Preconditions.checkNotNull(inputCoder);//(KvCoder<K, VIN>) input.getCoder();
-               this.windowingStrategy = 
Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
-               this.combineFn = combiner;
-               this.operator = createGroupAlsoByWindowOperator();
-               this.chainingStrategy = ChainingStrategy.ALWAYS;
-       }
-
-       @Override
-       public void open() throws Exception {
-               super.open();
-               this.context = new ProcessContext(operator, new 
TimestampedCollector<>(output), this.timerInternals);
-       }
-
-       /**
-        * Create the adequate {@link 
com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
-        * <b> if not already created</b>.
-        * If a {@link 
com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, 
then
-        * a function with that combiner is created, so that elements are 
combined as they arrive. This is
-        * done for speed and (in most of the cases) for reduction of the 
per-window state.
-        */
-       private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, 
VOUT>> createGroupAlsoByWindowOperator() {
-               if (this.operator == null) {
-                       if (this.combineFn == null) {
-                               // Thus VOUT == Iterable<VIN>
-                               Coder<VIN> inputValueCoder = 
inputKvCoder.getValueCoder();
-
-                               this.operator = (DoFn) 
GroupAlsoByWindowViaWindowSetDoFn.create(
-                                               (WindowingStrategy<?, W>) 
this.windowingStrategy, SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
-                       } else {
-                               Coder<K> inputKeyCoder = 
inputKvCoder.getKeyCoder();
-
-                               AppliedCombineFn<K, VIN, VACC, VOUT> 
appliedCombineFn = AppliedCombineFn
-                                               .withInputCoder(combineFn, 
coderRegistry, inputKvCoder);
-
-                               this.operator = 
GroupAlsoByWindowViaWindowSetDoFn.create(
-                                               (WindowingStrategy<?, W>) 
this.windowingStrategy, SystemReduceFn.<K, VIN, VACC, VOUT, 
W>combining(inputKeyCoder, appliedCombineFn));
-                       }
-               }
-               return this.operator;
-       }
-
-       private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) 
throws Exception {
-               context.setElement(workItem, 
getStateInternalsForKey(workItem.key()));
-
-               // TODO: Ideally startBundle/finishBundle would be called when 
the operator is first used / about to be discarded.
-               operator.startBundle(context);
-               operator.processElement(context);
-               operator.finishBundle(context);
-       }
-
-       @Override
-       public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> 
element) throws Exception {
-               ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
-               
elements.add(WindowedValue.of(element.getValue().getValue().getValue(), 
element.getValue().getTimestamp(),
-                               element.getValue().getWindows(), 
element.getValue().getPane()));
-               
processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(),
 elements));
-       }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               context.setCurrentInputWatermark(new 
Instant(mark.getTimestamp()));
-
-               Multimap<K, TimerInternals.TimerData> timers = 
getTimersReadyToProcess(mark.getTimestamp());
-               if (!timers.isEmpty()) {
-                       for (K key : timers.keySet()) {
-                               processKeyedWorkItem(KeyedWorkItems.<K, 
VIN>timersWorkItem(key, timers.get(key)));
-                       }
-               }
-
-               /**
-                * This is to take into account the different semantics of the 
Watermark in Flink and
-                * in Dataflow. To understand the reasoning behind the Dataflow 
semantics and its
-                * watermark holding logic, see the documentation of
-                * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, 
boolean)}
-                * */
-               long millis = Long.MAX_VALUE;
-               for (FlinkStateInternals state : perKeyStateInternals.values()) 
{
-                       Instant watermarkHold = state.getWatermarkHold();
-                       if (watermarkHold != null && watermarkHold.getMillis() 
< millis) {
-                               millis = watermarkHold.getMillis();
-                       }
-               }
-
-               if (mark.getTimestamp() < millis) {
-                       millis = mark.getTimestamp();
-               }
-
-               context.setCurrentOutputWatermark(new Instant(millis));
-
-               // Don't forget to re-emit the watermark for further operators 
down the line.
-               // This is critical for jobs with multiple aggregation steps.
-               // Imagine a job with a groupByKey() on key K1, followed by a 
map() that changes
-               // the key K1 to K2, and another groupByKey() on K2. In this 
case, if the watermark
-               // is not re-emitted, the second aggregation would never be 
triggered, and no result
-               // will be produced.
-               output.emitWatermark(new Watermark(millis));
-       }
-
-       @Override
-       public void close() throws Exception {
-               super.close();
-       }
-
-       private void registerActiveTimer(K key, TimerInternals.TimerData timer) 
{
-               Set<TimerInternals.TimerData> timersForKey = 
activeTimers.get(key);
-               if (timersForKey == null) {
-                       timersForKey = new HashSet<>();
-               }
-               timersForKey.add(timer);
-               activeTimers.put(key, timersForKey);
-       }
-
-       private void unregisterActiveTimer(K key, TimerInternals.TimerData 
timer) {
-               Set<TimerInternals.TimerData> timersForKey = 
activeTimers.get(key);
-               if (timersForKey != null) {
-                       timersForKey.remove(timer);
-                       if (timersForKey.isEmpty()) {
-                               activeTimers.remove(key);
-                       } else {
-                               activeTimers.put(key, timersForKey);
-                       }
-               }
-       }
-
-       /**
-        * Returns the list of timers that are ready to fire. These are the 
timers
-        * that are registered to be triggered at a time before the current 
watermark.
-        * We keep these timers in a Set, so that they are deduplicated, as the 
same
-        * timer can be registered multiple times.
-        */
-       private Multimap<K, TimerInternals.TimerData> 
getTimersReadyToProcess(long currentWatermark) {
-
-               // we keep the timers to return in a different list and launch 
them later
-               // because we cannot prevent a trigger from registering another 
trigger,
-               // which would lead to concurrent modification exception.
-               Multimap<K, TimerInternals.TimerData> toFire = 
HashMultimap.create();
-
-               Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = 
activeTimers.entrySet().iterator();
-               while (it.hasNext()) {
-                       Map.Entry<K, Set<TimerInternals.TimerData>> 
keyWithTimers = it.next();
-
-                       Iterator<TimerInternals.TimerData> timerIt = 
keyWithTimers.getValue().iterator();
-                       while (timerIt.hasNext()) {
-                               TimerInternals.TimerData timerData = 
timerIt.next();
-                               if 
(timerData.getTimestamp().isBefore(currentWatermark)) {
-                                       toFire.put(keyWithTimers.getKey(), 
timerData);
-                                       timerIt.remove();
-                               }
-                       }
-
-                       if (keyWithTimers.getValue().isEmpty()) {
-                               it.remove();
-                       }
-               }
-               return toFire;
-       }
-
-       /**
-        * Gets the state associated with the specified key.
-        *
-        * @param key the key whose state we want.
-        * @return The {@link FlinkStateInternals}
-        * associated with that key.
-        */
-       private FlinkStateInternals<K> getStateInternalsForKey(K key) {
-               FlinkStateInternals<K> stateInternals = 
perKeyStateInternals.get(key);
-               if (stateInternals == null) {
-                       Coder<? extends BoundedWindow> windowCoder = 
this.windowingStrategy.getWindowFn().windowCoder();
-                       OutputTimeFn<? super BoundedWindow> outputTimeFn = 
this.windowingStrategy.getWindowFn().getOutputTimeFn();
-                       stateInternals = new FlinkStateInternals<>(key, 
inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
-                       perKeyStateInternals.put(key, stateInternals);
-               }
-               return stateInternals;
-       }
-
-       private class FlinkTimerInternals extends 
AbstractFlinkTimerInternals<K, VIN> {
-               @Override
-               public void setTimer(TimerData timerKey) {
-                       registerActiveTimer(context.element().key(), timerKey);
-               }
-
-               @Override
-               public void deleteTimer(TimerData timerKey) {
-                       unregisterActiveTimer(context.element().key(), 
timerKey);
-               }
-       }
-
-       private class ProcessContext extends 
GroupAlsoByWindowViaWindowSetDoFn<K, VIN, VOUT, ?, KeyedWorkItem<K, 
VIN>>.ProcessContext {
-
-               private final FlinkTimerInternals timerInternals;
-
-               private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> 
collector;
-
-               private FlinkStateInternals<K> stateInternals;
-
-               private KeyedWorkItem<K, VIN> element;
-
-               public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> 
function,
-                                     TimestampedCollector<WindowedValue<KV<K, 
VOUT>>> outCollector,
-                                     FlinkTimerInternals timerInternals) {
-                       function.super();
-                       super.setupDelegateAggregators();
-
-                       this.collector = 
Preconditions.checkNotNull(outCollector);
-                       this.timerInternals = 
Preconditions.checkNotNull(timerInternals);
-               }
-
-               public void setElement(KeyedWorkItem<K, VIN> element,
-                                      FlinkStateInternals<K> stateForKey) {
-                       this.element = element;
-                       this.stateInternals = stateForKey;
-               }
-
-               public void setCurrentInputWatermark(Instant watermark) {
-                       this.timerInternals.setCurrentInputWatermark(watermark);
-               }
-
-               public void setCurrentOutputWatermark(Instant watermark) {
-                       
this.timerInternals.setCurrentOutputWatermark(watermark);
-               }
-
-               @Override
-               public KeyedWorkItem<K, VIN> element() {
-                       return this.element;
-               }
-
-               @Override
-               public Instant timestamp() {
-                       throw new UnsupportedOperationException("timestamp() is 
not available when processing KeyedWorkItems.");
-               }
-
-               @Override
-               public PipelineOptions getPipelineOptions() {
-                       // TODO: PipelineOptions need to be available on the 
workers.
-                       // Ideally they are captured as part of the pipeline.
-                       // For now, construct empty options so that 
StateContexts.createFromComponents
-                       // will yield a valid StateContext, which is needed to 
support the StateContext.window().
-                       if (options == null) {
-                               options = new PipelineOptions() {
-                                       @Override
-                                       public <T extends PipelineOptions> T 
as(Class<T> kls) {
-                                               return null;
-                                       }
-
-                                       @Override
-                                       public <T extends PipelineOptions> T 
cloneAs(Class<T> kls) {
-                                               return null;
-                                       }
-
-                                       @Override
-                                       public Class<? extends 
PipelineRunner<?>> getRunner() {
-                                               return null;
-                                       }
-
-                                       @Override
-                                       public void setRunner(Class<? extends 
PipelineRunner<?>> kls) {
-
-                                       }
-
-                                       @Override
-                                       public CheckEnabled 
getStableUniqueNames() {
-                                               return null;
-                                       }
-
-                                       @Override
-                                       public void 
setStableUniqueNames(CheckEnabled enabled) {
-                                       }
-                               };
-                       }
-                       return options;
-               }
-
-               @Override
-               public void output(KV<K, VOUT> output) {
-                       throw new UnsupportedOperationException(
-                                       "output() is not available when 
processing KeyedWorkItems.");
-               }
-
-               @Override
-               public void outputWithTimestamp(KV<K, VOUT> output, Instant 
timestamp) {
-                       throw new UnsupportedOperationException(
-                                       "outputWithTimestamp() is not available 
when processing KeyedWorkItems.");
-               }
-
-               @Override
-               public PaneInfo pane() {
-                       throw new UnsupportedOperationException("pane() is not 
available when processing KeyedWorkItems.");
-               }
-
-               @Override
-               public BoundedWindow window() {
-                       throw new UnsupportedOperationException(
-                                       "window() is not available when 
processing KeyedWorkItems.");
-               }
-
-               @Override
-               public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> 
windowingInternals() {
-                       return new WindowingInternals<KeyedWorkItem<K, VIN>, 
KV<K, VOUT>>() {
-
-                               @Override
-                               public 
com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() {
-                                       return stateInternals;
-                               }
-
-                               @Override
-                               public void outputWindowedValue(KV<K, VOUT> 
output, Instant timestamp, Collection<? extends BoundedWindow> windows, 
PaneInfo pane) {
-                                       // TODO: No need to represent timestamp 
twice.
-                                       
collector.setAbsoluteTimestamp(timestamp.getMillis());
-                                       
collector.collect(WindowedValue.of(output, timestamp, windows, pane));
-
-                               }
-
-                               @Override
-                               public TimerInternals timerInternals() {
-                                       return timerInternals;
-                               }
-
-                               @Override
-                               public Collection<? extends BoundedWindow> 
windows() {
-                                       throw new 
UnsupportedOperationException("windows() is not available in Streaming mode.");
-                               }
-
-                               @Override
-                               public PaneInfo pane() {
-                                       throw new 
UnsupportedOperationException("pane() is not available in Streaming mode.");
-                               }
-
-                               @Override
-                               public <T> void 
writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, 
Coder<T> elemCoder) throws IOException {
-                                       throw new 
RuntimeException("writePCollectionViewData() not available in Streaming mode.");
-                               }
-
-                               @Override
-                               public <T> T sideInput(PCollectionView<T> view, 
BoundedWindow mainInputWindow) {
-                                       throw new RuntimeException("sideInput() 
is not available in Streaming mode.");
-                               }
-                       };
-               }
-
-               @Override
-               public <T> T sideInput(PCollectionView<T> view) {
-                       throw new RuntimeException("sideInput() is not 
supported in Streaming mode.");
-               }
-
-               @Override
-               public <T> void sideOutput(TupleTag<T> tag, T output) {
-                       // ignore the side output, this can happen when a user 
does not register
-                       // side outputs but then outputs using a freshly 
created TupleTag.
-                       throw new RuntimeException("sideOutput() is not 
available when grouping by window.");
-               }
-
-               @Override
-               public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T 
output, Instant timestamp) {
-                       sideOutput(tag, output);
-               }
-
-               @Override
-               protected <AggInputT, AggOutputT> Aggregator<AggInputT, 
AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, 
?, AggOutputT> combiner) {
-                       Accumulator acc = 
getRuntimeContext().getAccumulator(name);
-                       if (acc != null) {
-                               AccumulatorHelper.compareAccumulatorTypes(name,
-                                               
SerializableFnAggregatorWrapper.class, acc.getClass());
-                               return (Aggregator<AggInputT, AggOutputT>) acc;
-                       }
-
-                       SerializableFnAggregatorWrapper<AggInputT, AggOutputT> 
accumulator =
-                                       new 
SerializableFnAggregatorWrapper<>(combiner);
-                       getRuntimeContext().addAccumulator(name, accumulator);
-                       return accumulator;
-               }
-       }
-
-       //////////////                          Checkpointing implementation    
                        ////////////////
-
-       @Override
-       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
-               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
-               AbstractStateBackend.CheckpointStateOutputView out = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-               StateCheckpointWriter writer = 
StateCheckpointWriter.create(out);
-               Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-               // checkpoint the timers
-               StateCheckpointUtils.encodeTimers(activeTimers, writer, 
keyCoder);
-
-               // checkpoint the state
-               StateCheckpointUtils.encodeState(perKeyStateInternals, writer, 
keyCoder);
-
-               // checkpoint the timerInternals
-               context.timerInternals.encodeTimerInternals(context, writer,
-                               inputKvCoder, 
windowingStrategy.getWindowFn().windowCoder());
-
-               taskState.setOperatorState(out.closeAndGetHandle());
-               return taskState;
-       }
-
-       @Override
-       public void restoreState(StreamTaskState taskState, long 
recoveryTimestamp) throws Exception {
-               super.restoreState(taskState, recoveryTimestamp);
-
-               final ClassLoader userClassloader = getUserCodeClassloader();
-
-               Coder<? extends BoundedWindow> windowCoder = 
this.windowingStrategy.getWindowFn().windowCoder();
-               Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-
-               @SuppressWarnings("unchecked")
-               StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
-               DataInputView in = inputState.getState(userClassloader);
-               StateCheckpointReader reader = new StateCheckpointReader(in);
-
-               // restore the timers
-               this.activeTimers = StateCheckpointUtils.decodeTimers(reader, 
windowCoder, keyCoder);
-
-               // restore the state
-               this.perKeyStateInternals = StateCheckpointUtils.decodeState(
-                               reader, windowingStrategy.getOutputTimeFn(), 
keyCoder, windowCoder, userClassloader);
-
-               // restore the timerInternals.
-               this.timerInternals.restoreTimerInternals(reader, inputKvCoder, 
windowCoder);
-       }
+    extends AbstractStreamOperator<WindowedValue<KV<K, VOUT>>>
+    implements OneInputStreamOperator<WindowedValue<KV<K, VIN>>, 
WindowedValue<KV<K, VOUT>>> {
+
+  private static final long serialVersionUID = 1L;
+
+  private transient PipelineOptions options;
+
+  private transient CoderRegistry coderRegistry;
+
+  private DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> operator;
+
+  private ProcessContext context;
+
+  private final WindowingStrategy<KV<K, VIN>, BoundedWindow> windowingStrategy;
+
+  private final Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn;
+
+  private final KvCoder<K, VIN> inputKvCoder;
+
+  /**
+   * State is kept <b>per-key</b>. This data structure keeps this mapping 
between an active key, i.e. a
+   * key whose elements are currently waiting to be processed, and its 
associated state.
+   */
+  private Map<K, FlinkStateInternals<K>> perKeyStateInternals = new 
HashMap<>();
+
+  /**
+   * Timers waiting to be processed.
+   */
+  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+
+  private FlinkTimerInternals timerInternals = new FlinkTimerInternals();
+
+  /**
+   * Creates an DataStream where elements are grouped in windows based on the 
specified windowing strategy.
+   * This method assumes that <b>elements are already grouped by key</b>.
+   * <p/>
+   * The difference with {@link #createForIterable(PipelineOptions, 
PCollection, KeyedStream)}
+   * is that this method assumes that a combiner function is provided
+   * (see {@link 
com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+   * A combiner helps at increasing the speed and, in most of the cases, 
reduce the per-window state.
+   *
+   * @param options            the general job configuration options.
+   * @param input              the input Dataflow {@link 
com.google.cloud.dataflow.sdk.values.PCollection}.
+   * @param groupedStreamByKey the input stream, it is assumed to already be 
grouped by key.
+   * @param combiner           the combiner to be used.
+   * @param outputKvCoder      the type of the output values.
+   */
+  public static <K, VIN, VACC, VOUT> DataStream<WindowedValue<KV<K, VOUT>>> 
create(
+      PipelineOptions options,
+      PCollection input,
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey,
+      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner,
+      KvCoder<K, VOUT> outputKvCoder) {
+    Preconditions.checkNotNull(options);
+
+    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+    FlinkGroupAlsoByWindowWrapper windower = new 
FlinkGroupAlsoByWindowWrapper<>(options,
+        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), 
inputKvCoder, combiner);
+
+    Coder<WindowedValue<KV<K, VOUT>>> windowedOutputElemCoder = 
WindowedValue.FullWindowedValueCoder.of(
+        outputKvCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    CoderTypeInformation<WindowedValue<KV<K, VOUT>>> outputTypeInfo =
+        new CoderTypeInformation<>(windowedOutputElemCoder);
+
+    DataStream<WindowedValue<KV<K, VOUT>>> groupedByKeyAndWindow = 
groupedStreamByKey
+        .transform("GroupByWindowWithCombiner",
+            new CoderTypeInformation<>(outputKvCoder),
+            windower)
+        .returns(outputTypeInfo);
+
+    return groupedByKeyAndWindow;
+  }
+
+  /**
+   * Creates an DataStream where elements are grouped in windows based on the 
specified windowing strategy.
+   * This method assumes that <b>elements are already grouped by key</b>.
+   * <p/>
+   * The difference with {@link #create(PipelineOptions, PCollection, 
KeyedStream, Combine.KeyedCombineFn, KvCoder)}
+   * is that this method assumes no combiner function
+   * (see {@link 
com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}).
+   *
+   * @param options            the general job configuration options.
+   * @param input              the input Dataflow {@link 
com.google.cloud.dataflow.sdk.values.PCollection}.
+   * @param groupedStreamByKey the input stream, it is assumed to already be 
grouped by key.
+   */
+  public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> 
createForIterable(
+      PipelineOptions options,
+      PCollection input,
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupedStreamByKey) {
+    Preconditions.checkNotNull(options);
+
+    KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) input.getCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+    FlinkGroupAlsoByWindowWrapper windower = new 
FlinkGroupAlsoByWindowWrapper(options,
+        input.getPipeline().getCoderRegistry(), input.getWindowingStrategy(), 
inputKvCoder, null);
+
+    Coder<Iterable<VIN>> valueIterCoder = IterableCoder.of(inputValueCoder);
+    KvCoder<K, Iterable<VIN>> outputElemCoder = KvCoder.of(keyCoder, 
valueIterCoder);
+
+    Coder<WindowedValue<KV<K, Iterable<VIN>>>> windowedOutputElemCoder = 
WindowedValue.FullWindowedValueCoder.of(
+        outputElemCoder,
+        input.getWindowingStrategy().getWindowFn().windowCoder());
+
+    CoderTypeInformation<WindowedValue<KV<K, Iterable<VIN>>>> outputTypeInfo =
+        new CoderTypeInformation<>(windowedOutputElemCoder);
+
+    DataStream<WindowedValue<KV<K, Iterable<VIN>>>> groupedByKeyAndWindow = 
groupedStreamByKey
+        .transform("GroupByWindow",
+            new CoderTypeInformation<>(windowedOutputElemCoder),
+            windower)
+        .returns(outputTypeInfo);
+
+    return groupedByKeyAndWindow;
+  }
+
+  public static <K, VIN, VACC, VOUT> FlinkGroupAlsoByWindowWrapper
+  createForTesting(PipelineOptions options,
+                   CoderRegistry registry,
+                   WindowingStrategy<KV<K, VIN>, BoundedWindow> 
windowingStrategy,
+                   KvCoder<K, VIN> inputCoder,
+                   Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combiner) {
+    Preconditions.checkNotNull(options);
+
+    return new FlinkGroupAlsoByWindowWrapper(options, registry, 
windowingStrategy, inputCoder, combiner);
+  }
+
+  private FlinkGroupAlsoByWindowWrapper(PipelineOptions options,
+                                        CoderRegistry registry,
+                                        WindowingStrategy<KV<K, VIN>, 
BoundedWindow> windowingStrategy,
+                                        KvCoder<K, VIN> inputCoder,
+                                        Combine.KeyedCombineFn<K, VIN, VACC, 
VOUT> combiner) {
+    Preconditions.checkNotNull(options);
+
+    this.options = Preconditions.checkNotNull(options);
+    this.coderRegistry = Preconditions.checkNotNull(registry);
+    this.inputKvCoder = Preconditions.checkNotNull(inputCoder);//(KvCoder<K, 
VIN>) input.getCoder();
+    this.windowingStrategy = 
Preconditions.checkNotNull(windowingStrategy);//input.getWindowingStrategy();
+    this.combineFn = combiner;
+    this.operator = createGroupAlsoByWindowOperator();
+    this.chainingStrategy = ChainingStrategy.ALWAYS;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    this.context = new ProcessContext(operator, new 
TimestampedCollector<>(output), this.timerInternals);
+  }
+
+  /**
+   * Create the adequate {@link 
com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn},
+   * <b> if not already created</b>.
+   * If a {@link 
com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, 
then
+   * a function with that combiner is created, so that elements are combined 
as they arrive. This is
+   * done for speed and (in most of the cases) for reduction of the per-window 
state.
+   */
+  private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> 
createGroupAlsoByWindowOperator() {
+    if (this.operator == null) {
+      if (this.combineFn == null) {
+        // Thus VOUT == Iterable<VIN>
+        Coder<VIN> inputValueCoder = inputKvCoder.getValueCoder();
+
+        this.operator = (DoFn) GroupAlsoByWindowViaWindowSetDoFn.create(
+            (WindowingStrategy<?, W>) this.windowingStrategy, 
SystemReduceFn.<K, VIN, W>buffering(inputValueCoder));
+      } else {
+        Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder();
+
+        AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = 
AppliedCombineFn
+            .withInputCoder(combineFn, coderRegistry, inputKvCoder);
+
+        this.operator = GroupAlsoByWindowViaWindowSetDoFn.create(
+            (WindowingStrategy<?, W>) this.windowingStrategy, 
SystemReduceFn.<K, VIN, VACC, VOUT, W>combining(inputKeyCoder, 
appliedCombineFn));
+      }
+    }
+    return this.operator;
+  }
+
+  private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws 
Exception {
+    context.setElement(workItem, getStateInternalsForKey(workItem.key()));
+
+    // TODO: Ideally startBundle/finishBundle would be called when the 
operator is first used / about to be discarded.
+    operator.startBundle(context);
+    operator.processElement(context);
+    operator.finishBundle(context);
+  }
+
+  @Override
+  public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) 
throws Exception {
+    ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
+    elements.add(WindowedValue.of(element.getValue().getValue().getValue(), 
element.getValue().getTimestamp(),
+        element.getValue().getWindows(), element.getValue().getPane()));
+    
processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(),
 elements));
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    context.setCurrentInputWatermark(new Instant(mark.getTimestamp()));
+
+    Multimap<K, TimerInternals.TimerData> timers = 
getTimersReadyToProcess(mark.getTimestamp());
+    if (!timers.isEmpty()) {
+      for (K key : timers.keySet()) {
+        processKeyedWorkItem(KeyedWorkItems.<K, VIN>timersWorkItem(key, 
timers.get(key)));
+      }
+    }
+
+    /**
+     * This is to take into account the different semantics of the Watermark 
in Flink and
+     * in Dataflow. To understand the reasoning behind the Dataflow semantics 
and its
+     * watermark holding logic, see the documentation of
+     * {@link WatermarkHold#addHold(ReduceFn.ProcessValueContext, boolean)}
+     * */
+    long millis = Long.MAX_VALUE;
+    for (FlinkStateInternals state : perKeyStateInternals.values()) {
+      Instant watermarkHold = state.getWatermarkHold();
+      if (watermarkHold != null && watermarkHold.getMillis() < millis) {
+        millis = watermarkHold.getMillis();
+      }
+    }
+
+    if (mark.getTimestamp() < millis) {
+      millis = mark.getTimestamp();
+    }
+
+    context.setCurrentOutputWatermark(new Instant(millis));
+
+    // Don't forget to re-emit the watermark for further operators down the 
line.
+    // This is critical for jobs with multiple aggregation steps.
+    // Imagine a job with a groupByKey() on key K1, followed by a map() that 
changes
+    // the key K1 to K2, and another groupByKey() on K2. In this case, if the 
watermark
+    // is not re-emitted, the second aggregation would never be triggered, and 
no result
+    // will be produced.
+    output.emitWatermark(new Watermark(millis));
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+  }
+
+  private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey == null) {
+      timersForKey = new HashSet<>();
+    }
+    timersForKey.add(timer);
+    activeTimers.put(key, timersForKey);
+  }
+
+  private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    if (timersForKey != null) {
+      timersForKey.remove(timer);
+      if (timersForKey.isEmpty()) {
+        activeTimers.remove(key);
+      } else {
+        activeTimers.put(key, timersForKey);
+      }
+    }
+  }
+
+  /**
+   * Returns the list of timers that are ready to fire. These are the timers
+   * that are registered to be triggered at a time before the current 
watermark.
+   * We keep these timers in a Set, so that they are deduplicated, as the same
+   * timer can be registered multiple times.
+   */
+  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long 
currentWatermark) {
+
+    // we keep the timers to return in a different list and launch them later
+    // because we cannot prevent a trigger from registering another trigger,
+    // which would lead to concurrent modification exception.
+    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+
+    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = 
activeTimers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+
+      Iterator<TimerInternals.TimerData> timerIt = 
keyWithTimers.getValue().iterator();
+      while (timerIt.hasNext()) {
+        TimerInternals.TimerData timerData = timerIt.next();
+        if (timerData.getTimestamp().isBefore(currentWatermark)) {
+          toFire.put(keyWithTimers.getKey(), timerData);
+          timerIt.remove();
+        }
+      }
+
+      if (keyWithTimers.getValue().isEmpty()) {
+        it.remove();
+      }
+    }
+    return toFire;
+  }
+
+  /**
+   * Gets the state associated with the specified key.
+   *
+   * @param key the key whose state we want.
+   * @return The {@link FlinkStateInternals}
+   * associated with that key.
+   */
+  private FlinkStateInternals<K> getStateInternalsForKey(K key) {
+    FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key);
+    if (stateInternals == null) {
+      Coder<? extends BoundedWindow> windowCoder = 
this.windowingStrategy.getWindowFn().windowCoder();
+      OutputTimeFn<? super BoundedWindow> outputTimeFn = 
this.windowingStrategy.getWindowFn().getOutputTimeFn();
+      stateInternals = new FlinkStateInternals<>(key, 
inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn);
+      perKeyStateInternals.put(key, stateInternals);
+    }
+    return stateInternals;
+  }
+
+  private class FlinkTimerInternals extends AbstractFlinkTimerInternals<K, 
VIN> {
+    @Override
+    public void setTimer(TimerData timerKey) {
+      registerActiveTimer(context.element().key(), timerKey);
+    }
+
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      unregisterActiveTimer(context.element().key(), timerKey);
+    }
+  }
+
+  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, 
VIN, VOUT, ?, KeyedWorkItem<K, VIN>>.ProcessContext {
+
+    private final FlinkTimerInternals timerInternals;
+
+    private final TimestampedCollector<WindowedValue<KV<K, VOUT>>> collector;
+
+    private FlinkStateInternals<K> stateInternals;
+
+    private KeyedWorkItem<K, VIN> element;
+
+    public ProcessContext(DoFn<KeyedWorkItem<K, VIN>, KV<K, VOUT>> function,
+                          TimestampedCollector<WindowedValue<KV<K, VOUT>>> 
outCollector,
+                          FlinkTimerInternals timerInternals) {
+      function.super();
+      super.setupDelegateAggregators();
+
+      this.collector = Preconditions.checkNotNull(outCollector);
+      this.timerInternals = Preconditions.checkNotNull(timerInternals);
+    }
+
+    public void setElement(KeyedWorkItem<K, VIN> element,
+                           FlinkStateInternals<K> stateForKey) {
+      this.element = element;
+      this.stateInternals = stateForKey;
+    }
+
+    public void setCurrentInputWatermark(Instant watermark) {
+      this.timerInternals.setCurrentInputWatermark(watermark);
+    }
+
+    public void setCurrentOutputWatermark(Instant watermark) {
+      this.timerInternals.setCurrentOutputWatermark(watermark);
+    }
+
+    @Override
+    public KeyedWorkItem<K, VIN> element() {
+      return this.element;
+    }
+
+    @Override
+    public Instant timestamp() {
+      throw new UnsupportedOperationException("timestamp() is not available 
when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      // TODO: PipelineOptions need to be available on the workers.
+      // Ideally they are captured as part of the pipeline.
+      // For now, construct empty options so that 
StateContexts.createFromComponents
+      // will yield a valid StateContext, which is needed to support the 
StateContext.window().
+      if (options == null) {
+        options = new PipelineOptions() {
+          @Override
+          public <T extends PipelineOptions> T as(Class<T> kls) {
+            return null;
+          }
+
+          @Override
+          public <T extends PipelineOptions> T cloneAs(Class<T> kls) {
+            return null;
+          }
+
+          @Override
+          public Class<? extends PipelineRunner<?>> getRunner() {
+            return null;
+          }
+
+          @Override
+          public void setRunner(Class<? extends PipelineRunner<?>> kls) {
+
+          }
+
+          @Override
+          public CheckEnabled getStableUniqueNames() {
+            return null;
+          }
+
+          @Override
+          public void setStableUniqueNames(CheckEnabled enabled) {
+          }
+        };
+      }
+      return options;
+    }
+
+    @Override
+    public void output(KV<K, VOUT> output) {
+      throw new UnsupportedOperationException(
+          "output() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public void outputWithTimestamp(KV<K, VOUT> output, Instant timestamp) {
+      throw new UnsupportedOperationException(
+          "outputWithTimestamp() is not available when processing 
KeyedWorkItems.");
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new UnsupportedOperationException("pane() is not available when 
processing KeyedWorkItems.");
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "window() is not available when processing KeyedWorkItems.");
+    }
+
+    @Override
+    public WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>> 
windowingInternals() {
+      return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() {
+
+        @Override
+        public com.google.cloud.dataflow.sdk.util.state.StateInternals 
stateInternals() {
+          return stateInternals;
+        }
+
+        @Override
+        public void outputWindowedValue(KV<K, VOUT> output, Instant timestamp, 
Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+          // TODO: No need to represent timestamp twice.
+          collector.setAbsoluteTimestamp(timestamp.getMillis());
+          collector.collect(WindowedValue.of(output, timestamp, windows, 
pane));
+
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return timerInternals;
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          throw new UnsupportedOperationException("windows() is not available 
in Streaming mode.");
+        }
+
+        @Override
+        public PaneInfo pane() {
+          throw new UnsupportedOperationException("pane() is not available in 
Streaming mode.");
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(TupleTag<?> tag, 
Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+          throw new RuntimeException("writePCollectionViewData() not available 
in Streaming mode.");
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
+          throw new RuntimeException("sideInput() is not available in 
Streaming mode.");
+        }
+      };
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new RuntimeException("sideInput() is not supported in Streaming 
mode.");
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      // ignore the side output, this can happen when a user does not register
+      // side outputs but then outputs using a freshly created TupleTag.
+      throw new RuntimeException("sideOutput() is not available when grouping 
by window.");
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
+      Accumulator acc = getRuntimeContext().getAccumulator(name);
+      if (acc != null) {
+        AccumulatorHelper.compareAccumulatorTypes(name,
+            SerializableFnAggregatorWrapper.class, acc.getClass());
+        return (Aggregator<AggInputT, AggOutputT>) acc;
+      }
+
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> accumulator =
+          new SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, accumulator);
+      return accumulator;
+    }
+  }
+
+  //////////////        Checkpointing implementation        ////////////////
+
+  @Override
+  public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+    StreamTaskState taskState = super.snapshotOperatorState(checkpointId, 
timestamp);
+    AbstractStateBackend.CheckpointStateOutputView out = 
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+    StateCheckpointWriter writer = StateCheckpointWriter.create(out);
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    // checkpoint the timers
+    StateCheckpointUtils.encodeTimers(activeTimers, writer, keyCoder);
+
+    // checkpoint the state
+    StateCheckpointUtils.encodeState(perKeyStateInternals, writer, keyCoder);
+
+    // checkpoint the timerInternals
+    context.timerInternals.encodeTimerInternals(context, writer,
+        inputKvCoder, windowingStrategy.getWindowFn().windowCoder());
+
+    taskState.setOperatorState(out.closeAndGetHandle());
+    return taskState;
+  }
+
+  @Override
+  public void restoreState(StreamTaskState taskState, long recoveryTimestamp) 
throws Exception {
+    super.restoreState(taskState, recoveryTimestamp);
+
+    final ClassLoader userClassloader = getUserCodeClassloader();
+
+    Coder<? extends BoundedWindow> windowCoder = 
this.windowingStrategy.getWindowFn().windowCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+
+    @SuppressWarnings("unchecked")
+    StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) 
taskState.getOperatorState();
+    DataInputView in = inputState.getState(userClassloader);
+    StateCheckpointReader reader = new StateCheckpointReader(in);
+
+    // restore the timers
+    this.activeTimers = StateCheckpointUtils.decodeTimers(reader, windowCoder, 
keyCoder);
+
+    // restore the state
+    this.perKeyStateInternals = StateCheckpointUtils.decodeState(
+        reader, windowingStrategy.getOutputTimeFn(), keyCoder, windowCoder, 
userClassloader);
+
+    // restore the timerInternals.
+    this.timerInternals.restoreTimerInternals(reader, inputKvCoder, 
windowCoder);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
index 24f6d40..d01cf81 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -34,31 +34,31 @@ import 
org.apache.flink.streaming.api.datastream.KeyedStream;
  * */
 public class FlinkGroupByKeyWrapper {
 
-       /**
-        * Just an auxiliary interface to bypass the fact that java anonymous 
classes cannot implement
-        * multiple interfaces.
-        */
-       private interface KeySelectorWithQueryableResultType<K, V> extends 
KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
-       }
+  /**
+   * Just an auxiliary interface to bypass the fact that java anonymous 
classes cannot implement
+   * multiple interfaces.
+   */
+  private interface KeySelectorWithQueryableResultType<K, V> extends 
KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
+  }
 
-       public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> 
groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, 
KvCoder<K, V> inputKvCoder) {
-               final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-               final TypeInformation<K> keyTypeInfo = new 
CoderTypeInformation<>(keyCoder);
-               final boolean isKeyVoid = keyCoder instanceof VoidCoder;
+  public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> 
groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, 
KvCoder<K, V> inputKvCoder) {
+    final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    final TypeInformation<K> keyTypeInfo = new 
CoderTypeInformation<>(keyCoder);
+    final boolean isKeyVoid = keyCoder instanceof VoidCoder;
 
-               return inputDataStream.keyBy(
-                               new KeySelectorWithQueryableResultType<K, V>() {
+    return inputDataStream.keyBy(
+        new KeySelectorWithQueryableResultType<K, V>() {
 
-                                       @Override
-                                       public K getKey(WindowedValue<KV<K, V>> 
value) throws Exception {
-                                               return isKeyVoid ? (K) 
VoidCoderTypeSerializer.VoidValue.INSTANCE :
-                                                               
value.getValue().getKey();
-                                       }
+          @Override
+          public K getKey(WindowedValue<KV<K, V>> value) throws Exception {
+            return isKeyVoid ? (K) VoidCoderTypeSerializer.VoidValue.INSTANCE :
+                value.getValue().getKey();
+          }
 
-                                       @Override
-                                       public TypeInformation<K> 
getProducedType() {
-                                               return keyTypeInfo;
-                                       }
-                               });
-       }
+          @Override
+          public TypeInformation<K> getProducedType() {
+            return keyTypeInfo;
+          }
+        });
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
index d65cbc3..066a55c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -33,43 +33,43 @@ import java.util.Map;
  * */
 public class FlinkParDoBoundMultiWrapper<IN, OUT> extends 
FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
 
-       private final TupleTag<?> mainTag;
-       private final Map<TupleTag<?>, Integer> outputLabels;
+  private final TupleTag<?> mainTag;
+  private final Map<TupleTag<?>, Integer> outputLabels;
 
-       public FlinkParDoBoundMultiWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> 
mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
-               super(options, windowingStrategy, doFn);
-               this.mainTag = Preconditions.checkNotNull(mainTag);
-               this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
-       }
+  public FlinkParDoBoundMultiWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> 
mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+    super(options, windowingStrategy, doFn);
+    this.mainTag = Preconditions.checkNotNull(mainTag);
+    this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+  }
 
-       @Override
-       public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT 
output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
-               checkTimestamp(inElement, timestamp);
-               Integer index = outputLabels.get(mainTag);
-               collector.collect(makeWindowedValue(
-                               new RawUnionValue(index, output),
-                               timestamp,
-                               inElement.getWindows(),
-                               inElement.getPane()));
-       }
+  @Override
+  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT 
output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
+    checkTimestamp(inElement, timestamp);
+    Integer index = outputLabels.get(mainTag);
+    collector.collect(makeWindowedValue(
+        new RawUnionValue(index, output),
+        timestamp,
+        inElement.getWindows(),
+        inElement.getPane()));
+  }
 
-       @Override
-       public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> 
inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> 
collector, TupleTag<T> tag) {
-               checkTimestamp(inElement, timestamp);
-               Integer index = outputLabels.get(tag);
-               if (index != null) {
-                       collector.collect(makeWindowedValue(
-                                       new RawUnionValue(index, output),
-                                       timestamp,
-                                       inElement.getWindows(),
-                                       inElement.getPane()));
-               }
-       }
+  @Override
+  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T 
output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector, 
TupleTag<T> tag) {
+    checkTimestamp(inElement, timestamp);
+    Integer index = outputLabels.get(tag);
+    if (index != null) {
+      collector.collect(makeWindowedValue(
+          new RawUnionValue(index, output),
+          timestamp,
+          inElement.getWindows(),
+          inElement.getPane()));
+    }
+  }
 
-       @Override
-       public WindowingInternals<IN, OUT> 
windowingInternalsHelper(WindowedValue<IN> inElement, 
Collector<WindowedValue<RawUnionValue>> outCollector) {
-               throw new RuntimeException("FlinkParDoBoundMultiWrapper is just 
an internal operator serving as " +
-                               "an intermediate transformation for the 
ParDo.BoundMulti translation. windowingInternals() " +
-                               "is not available in this class.");
-       }
+  @Override
+  public WindowingInternals<IN, OUT> 
windowingInternalsHelper(WindowedValue<IN> inElement, 
Collector<WindowedValue<RawUnionValue>> outCollector) {
+    throw new RuntimeException("FlinkParDoBoundMultiWrapper is just an 
internal operator serving as " +
+        "an intermediate transformation for the ParDo.BoundMulti translation. 
windowingInternals() " +
+        "is not available in this class.");
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
index b0d8a76..b3a7090 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -35,64 +35,64 @@ import java.util.*;
  * */
 public class FlinkParDoBoundWrapper<IN, OUT> extends 
FlinkAbstractParDoWrapper<IN, OUT, OUT> {
 
-       public FlinkParDoBoundWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
-               super(options, windowingStrategy, doFn);
-       }
+  public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, 
?> windowingStrategy, DoFn<IN, OUT> doFn) {
+    super(options, windowingStrategy, doFn);
+  }
 
-       @Override
-       public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT 
output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
-               checkTimestamp(inElement, timestamp);
-               collector.collect(makeWindowedValue(
-                               output,
-                               timestamp,
-                               inElement.getWindows(),
-                               inElement.getPane()));
-       }
+  @Override
+  public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT 
output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
+    checkTimestamp(inElement, timestamp);
+    collector.collect(makeWindowedValue(
+        output,
+        timestamp,
+        inElement.getWindows(),
+        inElement.getPane()));
+  }
 
-       @Override
-       public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> 
inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> 
outCollector, TupleTag<T> tag) {
-               // ignore the side output, this can happen when a user does not 
register
-               // side outputs but then outputs using a freshly created 
TupleTag.
-               throw new RuntimeException("sideOutput() not not available in 
ParDo.Bound().");
-       }
+  @Override
+  public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> inElement, T 
output, Instant timestamp, Collector<WindowedValue<OUT>> outCollector, 
TupleTag<T> tag) {
+    // ignore the side output, this can happen when a user does not register
+    // side outputs but then outputs using a freshly created TupleTag.
+    throw new RuntimeException("sideOutput() not not available in 
ParDo.Bound().");
+  }
 
-       @Override
-       public WindowingInternals<IN, OUT> windowingInternalsHelper(final 
WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
-               return new WindowingInternals<IN, OUT>() {
-                       @Override
-                       public StateInternals stateInternals() {
-                               throw new NullPointerException("StateInternals 
are not available for ParDo.Bound().");
-                       }
+  @Override
+  public WindowingInternals<IN, OUT> windowingInternalsHelper(final 
WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
+    return new WindowingInternals<IN, OUT>() {
+      @Override
+      public StateInternals stateInternals() {
+        throw new NullPointerException("StateInternals are not available for 
ParDo.Bound().");
+      }
 
-                       @Override
-                       public void outputWindowedValue(OUT output, Instant 
timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-                               collector.collect(makeWindowedValue(output, 
timestamp, windows, pane));
-                       }
+      @Override
+      public void outputWindowedValue(OUT output, Instant timestamp, 
Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+        collector.collect(makeWindowedValue(output, timestamp, windows, pane));
+      }
 
-                       @Override
-                       public TimerInternals timerInternals() {
-                               throw new NullPointerException("TimeInternals 
are not available for ParDo.Bound().");
-                       }
+      @Override
+      public TimerInternals timerInternals() {
+        throw new NullPointerException("TimeInternals are not available for 
ParDo.Bound().");
+      }
 
-                       @Override
-                       public Collection<? extends BoundedWindow> windows() {
-                               return inElement.getWindows();
-                       }
+      @Override
+      public Collection<? extends BoundedWindow> windows() {
+        return inElement.getWindows();
+      }
 
-                       @Override
-                       public PaneInfo pane() {
-                               return inElement.getPane();
-                       }
+      @Override
+      public PaneInfo pane() {
+        return inElement.getPane();
+      }
 
-                       @Override
-                       public <T> void writePCollectionViewData(TupleTag<?> 
tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-                               throw new 
RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
-                       }
+      @Override
+      public <T> void writePCollectionViewData(TupleTag<?> tag, 
Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+        throw new RuntimeException("writePCollectionViewData() not supported 
in Streaming mode.");
+      }
 
-                       @Override
-                       public <T> T sideInput(PCollectionView<T> view, 
BoundedWindow mainInputWindow) {
-                               throw new RuntimeException("sideInput() not 
implemented.");
-                       }
-               };
-       }
+      @Override
+      public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
+        throw new RuntimeException("sideInput() not implemented.");
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
index dc8e05a..39770c9 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -34,30 +34,30 @@ import java.util.List;
  */
 public class FlinkStreamingCreateFunction<IN, OUT> implements 
FlatMapFunction<IN, WindowedValue<OUT>> {
 
-       private final List<byte[]> elements;
-       private final Coder<OUT> coder;
-
-       public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> 
coder) {
-               this.elements = elements;
-               this.coder = coder;
-       }
-
-       @Override
-       public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws 
Exception {
-
-               @SuppressWarnings("unchecked")
-               OUT voidValue = (OUT) 
VoidCoderTypeSerializer.VoidValue.INSTANCE;
-               for (byte[] element : elements) {
-                       ByteArrayInputStream bai = new 
ByteArrayInputStream(element);
-                       OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-
-                       if (outValue == null) {
-                               out.collect(WindowedValue.of(voidValue, 
Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-                       } else {
-                               out.collect(WindowedValue.of(outValue, 
Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-                       }
-               }
-
-               out.close();
-       }
+  private final List<byte[]> elements;
+  private final Coder<OUT> coder;
+
+  public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) 
{
+    this.elements = elements;
+    this.coder = coder;
+  }
+
+  @Override
+  public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws 
Exception {
+
+    @SuppressWarnings("unchecked")
+    OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE;
+    for (byte[] element : elements) {
+      ByteArrayInputStream bai = new ByteArrayInputStream(element);
+      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
+
+      if (outValue == null) {
+        out.collect(WindowedValue.of(voidValue, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+      } else {
+        out.collect(WindowedValue.of(outValue, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+      }
+    }
+
+    out.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
index 699d256..4d6f4e2 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -31,50 +31,50 @@ import java.util.List;
  * */
 public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> 
extends UnboundedSource<T, C> {
 
-       private final PipelineOptions options;
-       private final RichParallelSourceFunction<T> flinkSource;
+  private final PipelineOptions options;
+  private final RichParallelSourceFunction<T> flinkSource;
 
-       public UnboundedFlinkSource(PipelineOptions pipelineOptions, 
RichParallelSourceFunction<T> source) {
-               
if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
-                       throw new RuntimeException("Flink Sources are supported 
only when running with the FlinkPipelineRunner.");
-               }
-               options = Preconditions.checkNotNull(pipelineOptions);
-               flinkSource = Preconditions.checkNotNull(source);
-               validate();
-       }
+  public UnboundedFlinkSource(PipelineOptions pipelineOptions, 
RichParallelSourceFunction<T> source) {
+    if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
+      throw new RuntimeException("Flink Sources are supported only when 
running with the FlinkPipelineRunner.");
+    }
+    options = Preconditions.checkNotNull(pipelineOptions);
+    flinkSource = Preconditions.checkNotNull(source);
+    validate();
+  }
 
-       public RichParallelSourceFunction<T> getFlinkSource() {
-               return this.flinkSource;
-       }
+  public RichParallelSourceFunction<T> getFlinkSource() {
+    return this.flinkSource;
+  }
 
-       @Override
-       public List<? extends UnboundedSource<T, C>> generateInitialSplits(int 
desiredNumSplits, PipelineOptions options) throws Exception {
-               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
-       }
+  @Override
+  public List<? extends UnboundedSource<T, C>> generateInitialSplits(int 
desiredNumSplits, PipelineOptions options) throws Exception {
+    throw new RuntimeException("Flink Sources are supported only when running 
with the FlinkPipelineRunner.");
+  }
 
-       @Override
-       public UnboundedReader<T> createReader(PipelineOptions options, 
@Nullable C checkpointMark) {
-               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
-       }
+  @Override
+  public UnboundedReader<T> createReader(PipelineOptions options, @Nullable C 
checkpointMark) {
+    throw new RuntimeException("Flink Sources are supported only when running 
with the FlinkPipelineRunner.");
+  }
 
-       @Nullable
-       @Override
-       public Coder<C> getCheckpointMarkCoder() {
-               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
-       }
+  @Nullable
+  @Override
+  public Coder<C> getCheckpointMarkCoder() {
+    throw new RuntimeException("Flink Sources are supported only when running 
with the FlinkPipelineRunner.");
+  }
 
 
-       @Override
-       public void validate() {
-               Preconditions.checkNotNull(options);
-               Preconditions.checkNotNull(flinkSource);
-               if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
-                       throw new RuntimeException("Flink Sources are supported 
only when running with the FlinkPipelineRunner.");
-               }
-       }
+  @Override
+  public void validate() {
+    Preconditions.checkNotNull(options);
+    Preconditions.checkNotNull(flinkSource);
+    if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
+      throw new RuntimeException("Flink Sources are supported only when 
running with the FlinkPipelineRunner.");
+    }
+  }
 
-       @Override
-       public Coder<T> getDefaultOutputCoder() {
-               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
-       }
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    throw new RuntimeException("Flink Sources are supported only when running 
with the FlinkPipelineRunner.");
+  }
 }

Reply via email to