http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
new file mode 100644
index 0000000..123d5e7
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -0,0 +1,1044 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.runners.core.ElementAndRestriction;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains all the mappings between Beam and Flink
+ * <b>streaming</b> transformations. The {@link 
FlinkStreamingPipelineTranslator}
+ * traverses the Beam job and comes here to translate the encountered Beam 
transformations
+ * into Flink one, based on the mapping available in this class.
+ */
+class FlinkStreamingTransformTranslators {
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Transform Translator Registry
+  // 
--------------------------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private static final Map<
+      Class<? extends PTransform>,
+      FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS 
= new HashMap<>();
+
+  // here you can find all the available translators.
+  static {
+    TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
+    TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
+    TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
+
+    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator());
+    TRANSLATORS.put(
+        SplittableParDo.ProcessElements.class, new 
SplittableProcessElementsStreamingTranslator());
+    TRANSLATORS.put(
+        SplittableParDo.GBKIntoKeyedWorkItems.class, new 
GBKIntoKeyedWorkItemsTranslator());
+
+
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
+    TRANSLATORS.put(Flatten.PCollections.class, new 
FlattenPCollectionTranslator());
+    TRANSLATORS.put(
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
+        new CreateViewStreamingTranslator());
+
+    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorStreaming());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+  }
+
+  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> 
getTranslator(
+      PTransform<?, ?> transform) {
+    return TRANSLATORS.get(transform.getClass());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Transformation Implementations
+  // 
--------------------------------------------------------------------------------------------
+
+  private static class TextIOWriteBoundStreamingTranslator
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound> {
+
+    private static final Logger LOG =
+        LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+
+    @Override
+    public void translateNode(
+        TextIO.Write.Bound transform,
+        FlinkStreamingTranslationContext context) {
+      PValue input = context.getInput(transform);
+      DataStream<WindowedValue<String>> inputDataStream = 
context.getInputDataStream(input);
+
+      String filenamePrefix = transform.getFilenamePrefix();
+      String filenameSuffix = transform.getFilenameSuffix();
+      boolean needsValidation = transform.needsValidation();
+      int numShards = transform.getNumShards();
+      String shardNameTemplate = transform.getShardNameTemplate();
+
+      // TODO: Implement these. We need Flink support for this.
+      LOG.warn(
+          "Translation of TextIO.Write.needsValidation not yet supported. Is: 
{}.",
+          needsValidation);
+      LOG.warn(
+          "Translation of TextIO.Write.filenameSuffix not yet supported. Is: 
{}.",
+          filenameSuffix);
+      LOG.warn(
+          "Translation of TextIO.Write.shardNameTemplate not yet supported. 
Is: {}.",
+          shardNameTemplate);
+
+      DataStream<String> dataSink = inputDataStream
+          .flatMap(new FlatMapFunction<WindowedValue<String>, String>() {
+            @Override
+            public void flatMap(
+                WindowedValue<String> value,
+                Collector<String> out)
+                throws Exception {
+              out.collect(value.getValue());
+            }
+          });
+      DataStreamSink<String> output =
+          dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
+
+      if (numShards > 0) {
+        output.setParallelism(numShards);
+      }
+    }
+  }
+
+  private static class UnboundedReadSourceTranslator<T>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+
+    @Override
+    public void translateNode(
+        Read.Unbounded<T> transform,
+        FlinkStreamingTranslationContext context) {
+      PCollection<T> output = context.getOutput(transform);
+
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DataStream<WindowedValue<T>> source;
+      try {
+        UnboundedSourceWrapper<T, ?> sourceWrapper =
+            new UnboundedSourceWrapper<>(
+                context.getPipelineOptions(),
+                transform.getSource(),
+                context.getExecutionEnvironment().getParallelism());
+        source = context
+            .getExecutionEnvironment()
+            
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error while translating UnboundedSource: " + 
transform.getSource(), e);
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }
+
+  private static class BoundedReadSourceTranslator<T>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+
+    @Override
+    public void translateNode(
+        Read.Bounded<T> transform,
+        FlinkStreamingTranslationContext context) {
+      PCollection<T> output = context.getOutput(transform);
+
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+
+      DataStream<WindowedValue<T>> source;
+      try {
+        BoundedSourceWrapper<T> sourceWrapper =
+            new BoundedSourceWrapper<>(
+                context.getPipelineOptions(),
+                transform.getSource(),
+                context.getExecutionEnvironment().getParallelism());
+        source = context
+            .getExecutionEnvironment()
+            
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Error while translating BoundedSource: " + transform.getSource(), 
e);
+      }
+
+      context.setOutputDataStream(output, source);
+    }
+  }
+
+  /**
+   * Wraps each element in a {@link RawUnionValue} with the given tag id.
+   */
+  private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
+    private final int intTag;
+
+    public ToRawUnion(int intTag) {
+      this.intTag = intTag;
+    }
+
+    @Override
+    public RawUnionValue map(T o) throws Exception {
+      return new RawUnionValue(intTag, o);
+    }
+  }
+
+  private static Tuple2<Map<Integer, PCollectionView<?>>, 
DataStream<RawUnionValue>>
+        transformSideInputs(
+          Collection<PCollectionView<?>> sideInputs,
+          FlinkStreamingTranslationContext context) {
+
+    // collect all side inputs
+    Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+    Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+    int count = 0;
+    for (PCollectionView<?> sideInput: sideInputs) {
+      TupleTag<?> tag = sideInput.getTagInternal();
+      intToViewMapping.put(count, sideInput);
+      tagToIntMapping.put(tag, count);
+      count++;
+      Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal();
+    }
+
+
+    List<Coder<?>> inputCoders = new ArrayList<>();
+    for (PCollectionView<?> sideInput: sideInputs) {
+      DataStream<Object> sideInputStream = 
context.getInputDataStream(sideInput);
+      TypeInformation<Object> tpe = sideInputStream.getType();
+      if (!(tpe instanceof CoderTypeInformation)) {
+        throw new IllegalStateException(
+            "Input Stream TypeInformation is no CoderTypeInformation.");
+      }
+
+      Coder<?> coder = ((CoderTypeInformation) tpe).getCoder();
+      inputCoders.add(coder);
+    }
+
+    UnionCoder unionCoder = UnionCoder.of(inputCoders);
+
+    CoderTypeInformation<RawUnionValue> unionTypeInformation =
+        new CoderTypeInformation<>(unionCoder);
+
+    // transform each side input to RawUnionValue and union them
+    DataStream<RawUnionValue> sideInputUnion = null;
+
+    for (PCollectionView<?> sideInput: sideInputs) {
+      TupleTag<?> tag = sideInput.getTagInternal();
+      final int intTag = tagToIntMapping.get(tag);
+      DataStream<Object> sideInputStream = 
context.getInputDataStream(sideInput);
+      DataStream<RawUnionValue> unionValueStream =
+          sideInputStream.map(new 
ToRawUnion<>(intTag)).returns(unionTypeInformation);
+
+      if (sideInputUnion == null) {
+        sideInputUnion = unionValueStream;
+      } else {
+        sideInputUnion = sideInputUnion.union(unionValueStream);
+      }
+    }
+
+    if (sideInputUnion == null) {
+      throw new IllegalStateException("No unioned side inputs, this indicates 
a bug.");
+    }
+
+    return new Tuple2<>(intToViewMapping, sideInputUnion);
+  }
+
+  /**
+   * Helper for translating {@link ParDo.MultiOutput} and {@link 
SplittableParDo.ProcessElements}.
+   */
+  static class ParDoTranslationHelper {
+
+    interface DoFnOperatorFactory<InputT, OutputT> {
+      DoFnOperator<InputT, OutputT, RawUnionValue> createDoFnOperator(
+          DoFn<InputT, OutputT> doFn,
+          List<PCollectionView<?>> sideInputs,
+          TupleTag<OutputT> mainOutputTag,
+          List<TupleTag<?>> additionalOutputTags,
+          FlinkStreamingTranslationContext context,
+          WindowingStrategy<?, ?> windowingStrategy,
+          Map<TupleTag<?>, Integer> tagsToLabels,
+          Coder<WindowedValue<InputT>> inputCoder,
+          Coder keyCoder,
+          Map<Integer, PCollectionView<?>> transformedSideInputs);
+    }
+
+    static <InputT, OutputT> void translateParDo(
+        String transformName,
+        DoFn<InputT, OutputT> doFn,
+        PCollection<InputT> input,
+        List<PCollectionView<?>> sideInputs,
+        Map<TupleTag<?>, PValue> outputs,
+        TupleTag<OutputT> mainOutputTag,
+        List<TupleTag<?>> additionalOutputTags,
+        FlinkStreamingTranslationContext context,
+        DoFnOperatorFactory<InputT, OutputT> doFnOperatorFactory) {
+
+      // we assume that the transformation does not change the windowing 
strategy.
+      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+      Map<TupleTag<?>, Integer> tagsToLabels =
+          transformTupleTagsToLabels(mainOutputTag, outputs);
+
+      SingleOutputStreamOperator<RawUnionValue> unionOutputStream;
+
+      Coder<WindowedValue<InputT>> inputCoder = context.getCoder(input);
+
+      DataStream<WindowedValue<InputT>> inputDataStream = 
context.getInputDataStream(input);
+
+      Coder keyCoder = null;
+      boolean stateful = false;
+      DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+      if (signature.stateDeclarations().size() > 0
+          || signature.timerDeclarations().size() > 0) {
+        // Based on the fact that the signature is stateful, DoFnSignatures 
ensures
+        // that it is also keyed
+        keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
+        inputDataStream = inputDataStream.keyBy(new 
KvToByteBufferKeySelector(keyCoder));
+        stateful = true;
+      } else if (doFn instanceof SplittableParDo.ProcessFn) {
+        // we know that it is keyed on String
+        keyCoder = StringUtf8Coder.of();
+        stateful = true;
+      }
+
+      if (sideInputs.isEmpty()) {
+        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+            doFnOperatorFactory.createDoFnOperator(
+                doFn,
+                sideInputs,
+                mainOutputTag,
+                additionalOutputTags,
+                context,
+                windowingStrategy,
+                tagsToLabels,
+                inputCoder,
+                keyCoder,
+                new HashMap<Integer, PCollectionView<?>>() /* side-input 
mapping */);
+
+        UnionCoder outputUnionCoder = createUnionCoder(outputs);
+
+        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+            new CoderTypeInformation<>(outputUnionCoder);
+
+        unionOutputStream = inputDataStream
+            .transform(transformName, outputUnionTypeInformation, 
doFnOperator);
+
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> 
transformedSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
+            doFnOperatorFactory.createDoFnOperator(
+                doFn,
+                sideInputs,
+                mainOutputTag,
+                additionalOutputTags,
+                context,
+                windowingStrategy,
+                tagsToLabels,
+                inputCoder,
+                keyCoder,
+                transformedSideInputs.f0);
+
+        UnionCoder outputUnionCoder = createUnionCoder(outputs);
+
+        CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
+            new CoderTypeInformation<>(outputUnionCoder);
+
+        if (stateful) {
+          // we have to manually contruct the two-input transform because 
we're not
+          // allowed to have only one input keyed, normally.
+          KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
+          TwoInputTransformation<
+              WindowedValue<KV<?, InputT>>,
+              RawUnionValue,
+              WindowedValue<OutputT>> rawFlinkTransform = new 
TwoInputTransformation(
+              keyedStream.getTransformation(),
+              transformedSideInputs.f1.broadcast().getTransformation(),
+              transformName,
+              (TwoInputStreamOperator) doFnOperator,
+              outputUnionTypeInformation,
+              keyedStream.getParallelism());
+
+          rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
+          rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), 
null);
+
+          unionOutputStream = new SingleOutputStreamOperator(
+                  keyedStream.getExecutionEnvironment(),
+                  rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
+
+          keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        } else {
+          unionOutputStream = inputDataStream
+              .connect(transformedSideInputs.f1.broadcast())
+              .transform(transformName, outputUnionTypeInformation, 
doFnOperator);
+        }
+      }
+
+      SplitStream<RawUnionValue> splitStream = unionOutputStream
+              .split(new OutputSelector<RawUnionValue>() {
+                @Override
+                public Iterable<String> select(RawUnionValue value) {
+                  return 
Collections.singletonList(Integer.toString(value.getUnionTag()));
+                }
+              });
+
+      for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
+        final int outputTag = tagsToLabels.get(output.getKey());
+
+        TypeInformation outputTypeInfo = context.getTypeInfo((PCollection<?>) 
output.getValue());
+
+        @SuppressWarnings("unchecked")
+        DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
+          .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+            @Override
+            public void flatMap(RawUnionValue value, Collector<Object> out) 
throws Exception {
+              out.collect(value.getValue());
+            }
+          }).returns(outputTypeInfo);
+
+        context.setOutputDataStream(output.getValue(), unwrapped);
+      }
+    }
+
+    private static Map<TupleTag<?>, Integer> transformTupleTagsToLabels(
+        TupleTag<?> mainTag,
+        Map<TupleTag<?>, PValue> allTaggedValues) {
+
+      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+      int count = 0;
+      tagToLabelMap.put(mainTag, count++);
+      for (TupleTag<?> key : allTaggedValues.keySet()) {
+        if (!tagToLabelMap.containsKey(key)) {
+          tagToLabelMap.put(key, count++);
+        }
+      }
+      return tagToLabelMap;
+    }
+
+    private static UnionCoder createUnionCoder(Map<TupleTag<?>, PValue> 
taggedCollections) {
+      List<Coder<?>> outputCoders = Lists.newArrayList();
+      for (PValue taggedColl : taggedCollections.values()) {
+        checkArgument(
+            taggedColl instanceof PCollection,
+            "A Union Coder can only be created for a Collection of Tagged %s. 
Got %s",
+            PCollection.class.getSimpleName(),
+            taggedColl.getClass().getSimpleName());
+        PCollection<?> coll = (PCollection<?>) taggedColl;
+        WindowedValue.FullWindowedValueCoder<?> windowedValueCoder =
+            WindowedValue.getFullCoder(
+                coll.getCoder(),
+                coll.getWindowingStrategy().getWindowFn().windowCoder());
+        outputCoders.add(windowedValueCoder);
+      }
+      return UnionCoder.of(outputCoders);
+    }
+  }
+
+  private static class ParDoStreamingTranslator<InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      ParDo.MultiOutput<InputT, OutputT>> {
+
+    @Override
+    public void translateNode(
+        ParDo.MultiOutput<InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      ParDoTranslationHelper.translateParDo(
+          transform.getName(),
+          transform.getFn(),
+          (PCollection<InputT>) context.getInput(transform),
+          transform.getSideInputs(),
+          context.getOutputs(transform),
+          transform.getMainOutputTag(),
+          transform.getAdditionalOutputTags().getAll(),
+          context,
+          new ParDoTranslationHelper.DoFnOperatorFactory<InputT, OutputT>() {
+            @Override
+            public DoFnOperator<InputT, OutputT, RawUnionValue> 
createDoFnOperator(
+                DoFn<InputT, OutputT> doFn,
+                List<PCollectionView<?>> sideInputs,
+                TupleTag<OutputT> mainOutputTag,
+                List<TupleTag<?>> additionalOutputTags,
+                FlinkStreamingTranslationContext context,
+                WindowingStrategy<?, ?> windowingStrategy,
+                Map<TupleTag<?>, Integer> tagsToLabels,
+                Coder<WindowedValue<InputT>> inputCoder,
+                Coder keyCoder,
+                Map<Integer, PCollectionView<?>> transformedSideInputs) {
+              return new DoFnOperator<>(
+                  doFn,
+                  inputCoder,
+                  mainOutputTag,
+                  additionalOutputTags,
+                  new 
DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                  windowingStrategy,
+                  transformedSideInputs,
+                  sideInputs,
+                  context.getPipelineOptions(),
+                  keyCoder);
+            }
+          });
+    }
+  }
+
+  private static class SplittableProcessElementsStreamingTranslator<
+      InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, 
TrackerT>> {
+
+    @Override
+    public void translateNode(
+        SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, 
TrackerT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      ParDoTranslationHelper.translateParDo(
+          transform.getName(),
+          transform.newProcessFn(transform.getFn()),
+          (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>>)
+              context.getInput(transform),
+          transform.getSideInputs(),
+          context.getOutputs(transform),
+          transform.getMainOutputTag(),
+          transform.getAdditionalOutputTags().getAll(),
+          context,
+          new ParDoTranslationHelper.DoFnOperatorFactory<
+              KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT>() {
+            @Override
+            public DoFnOperator<
+                KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>,
+                OutputT,
+                RawUnionValue> createDoFnOperator(
+                    DoFn<
+                        KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>,
+                        OutputT> doFn,
+                    List<PCollectionView<?>> sideInputs,
+                    TupleTag<OutputT> mainOutputTag,
+                    List<TupleTag<?>> additionalOutputTags,
+                    FlinkStreamingTranslationContext context,
+                    WindowingStrategy<?, ?> windowingStrategy,
+                    Map<TupleTag<?>, Integer> tagsToLabels,
+                    Coder<
+                        WindowedValue<
+                            KeyedWorkItem<
+                                String,
+                                ElementAndRestriction<InputT, RestrictionT>>>> 
inputCoder,
+                    Coder keyCoder,
+                    Map<Integer, PCollectionView<?>> transformedSideInputs) {
+              return new SplittableDoFnOperator<>(
+                  doFn,
+                  inputCoder,
+                  mainOutputTag,
+                  additionalOutputTags,
+                  new 
DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
+                  windowingStrategy,
+                  transformedSideInputs,
+                  sideInputs,
+                  context.getPipelineOptions(),
+                  keyCoder);
+            }
+          });
+    }
+  }
+
+  private static class CreateViewStreamingTranslator<ElemT, ViewT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT>> {
+
+    @Override
+    public void translateNode(
+        FlinkStreamingViewOverrides.CreateFlinkPCollectionView<ElemT, ViewT> 
transform,
+        FlinkStreamingTranslationContext context) {
+      // just forward
+      DataStream<WindowedValue<List<ElemT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      PCollectionView<ViewT> view = context.getOutput(transform);
+
+      context.setOutputDataStream(view, inputDataSet);
+    }
+  }
+
+  private static class WindowAssignTranslator<T>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> {
+
+    @Override
+    public void translateNode(
+        Window.Assign<T> transform,
+        FlinkStreamingTranslationContext context) {
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<T, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<T, BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
+
+      TypeInformation<WindowedValue<T>> typeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DataStream<WindowedValue<T>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+
+      WindowFn<T, ? extends BoundedWindow> windowFn = 
windowingStrategy.getWindowFn();
+
+      FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+          new FlinkAssignWindows<>(windowFn);
+
+      SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = 
inputDataStream
+          .flatMap(assignWindowsFunction)
+          .name(context.getOutput(transform).getName())
+          .returns(typeInfo);
+
+      context.setOutputDataStream(context.getOutput(transform), 
outputDataStream);
+    }
+  }
+
+  private static class ReshuffleTranslatorStreaming<K, InputT>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Reshuffle<K, 
InputT>> {
+
+    @Override
+    public void translateNode(
+        Reshuffle<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataStream(context.getInput(transform));
+
+      context.setOutputDataStream(context.getOutput(transform), 
inputDataSet.rebalance());
+
+    }
+  }
+
+
+  private static class GroupByKeyTranslator<K, InputT>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, 
InputT>> {
+
+    @Override
+    public void translateNode(
+        GroupByKey<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = 
SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = 
context.getInputDataStream(input);
+
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
+          WindowedValue.getFullCoder(
+              workItemCoder,
+              input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
+          inputDataStream
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+          WindowedValue<
+              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> 
keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, 
InputT>(inputKvCoder.getKeyCoder()));
+
+      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, 
BoundedWindow> reduceFn =
+          SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+
+      TypeInformation<WindowedValue<KV<K, Iterable<InputT>>>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      DoFnOperator.DefaultOutputManagerFactory<
+            WindowedValue<KV<K, Iterable<InputT>>>> outputManagerFactory =
+          new DoFnOperator.DefaultOutputManagerFactory<>();
+
+      WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
+          new WindowDoFnOperator<>(
+              reduceFn,
+              (Coder) windowedWorkItemCoder,
+              new TupleTag<KV<K, Iterable<InputT>>>("main output"),
+              Collections.<TupleTag<?>>emptyList(),
+              outputManagerFactory,
+              windowingStrategy,
+              new HashMap<Integer, PCollectionView<?>>(), /* side-input 
mapping */
+              Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+              context.getPipelineOptions(),
+              inputKvCoder.getKeyCoder());
+
+      // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
+      // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
+      @SuppressWarnings("unchecked")
+      SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<InputT>>>> 
outDataStream =
+          keyedWorkItemStream
+              .transform(
+                  transform.getName(),
+                  outputTypeInfo,
+                  (OneInputStreamOperator) doFnOperator);
+
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+
+    }
+  }
+
+  private static class CombinePerKeyTranslator<K, InputT, OutputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      Combine.PerKey<K, InputT, OutputT>> {
+
+    @Override
+    boolean canTranslate(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      // if we have a merging window strategy and side inputs we cannot
+      // translate as a proper combine. We have to group and then run the 
combine
+      // over the final grouped values.
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      return windowingStrategy.getWindowFn().isNonMerging() || 
transform.getSideInputs().isEmpty();
+    }
+
+    @Override
+    public void translateNode(
+        Combine.PerKey<K, InputT, OutputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) input.getWindowingStrategy();
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = 
SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = 
context.getInputDataStream(input);
+
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
+            WindowedValue.getFullCoder(
+                workItemCoder,
+                input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
+          inputDataStream
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+            WindowedValue<
+                SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> 
keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, 
InputT>(inputKvCoder.getKeyCoder()));
+
+      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = 
SystemReduceFn.combining(
+          inputKvCoder.getKeyCoder(),
+          AppliedCombineFn.withInputCoder(
+              transform.getFn(), input.getPipeline().getCoderRegistry(), 
inputKvCoder));
+
+      TypeInformation<WindowedValue<KV<K, OutputT>>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+      List<PCollectionView<?>> sideInputs = transform.getSideInputs();
+
+      if (sideInputs.isEmpty()) {
+
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                (Coder) windowedWorkItemCoder,
+                new TupleTag<KV<K, OutputT>>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new 
DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                windowingStrategy,
+                new HashMap<Integer, PCollectionView<?>>(), /* side-input 
mapping */
+                Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder());
+
+        // our operator excepts WindowedValue<KeyedWorkItem> while our input 
stream
+        // is WindowedValue<SingletonKeyedWorkItem>, which is fine but Java 
doesn't like it ...
+        @SuppressWarnings("unchecked")
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
+            keyedWorkItemStream.transform(
+                transform.getName(), outputTypeInfo, (OneInputStreamOperator) 
doFnOperator);
+
+        context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
+      } else {
+        Tuple2<Map<Integer, PCollectionView<?>>, DataStream<RawUnionValue>> 
transformSideInputs =
+            transformSideInputs(sideInputs, context);
+
+        WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
+            new WindowDoFnOperator<>(
+                reduceFn,
+                (Coder) windowedWorkItemCoder,
+                new TupleTag<KV<K, OutputT>>("main output"),
+                Collections.<TupleTag<?>>emptyList(),
+                new 
DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
+                windowingStrategy,
+                transformSideInputs.f0,
+                sideInputs,
+                context.getPipelineOptions(),
+                inputKvCoder.getKeyCoder());
+
+        // we have to manually contruct the two-input transform because we're 
not
+        // allowed to have only one input keyed, normally.
+
+        TwoInputTransformation<
+            WindowedValue<SingletonKeyedWorkItem<K, InputT>>,
+            RawUnionValue,
+            WindowedValue<KV<K, OutputT>>> rawFlinkTransform = new 
TwoInputTransformation<>(
+            keyedWorkItemStream.getTransformation(),
+            transformSideInputs.f1.broadcast().getTransformation(),
+            transform.getName(),
+            (TwoInputStreamOperator) doFnOperator,
+            outputTypeInfo,
+            keyedWorkItemStream.getParallelism());
+
+        rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
+        
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(), 
null);
+
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> 
outDataStream =
+            new SingleOutputStreamOperator(
+                keyedWorkItemStream.getExecutionEnvironment(),
+                rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
+
+        
keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
+      }
+    }
+  }
+
+  private static class GBKIntoKeyedWorkItemsTranslator<K, InputT>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      SplittableParDo.GBKIntoKeyedWorkItems<K, InputT>> {
+
+    @Override
+    boolean canTranslate(
+        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+      return true;
+    }
+
+    @Override
+    public void translateNode(
+        SplittableParDo.GBKIntoKeyedWorkItems<K, InputT> transform,
+        FlinkStreamingTranslationContext context) {
+
+      PCollection<KV<K, InputT>> input = context.getInput(transform);
+
+      KvCoder<K, InputT> inputKvCoder = (KvCoder<K, InputT>) input.getCoder();
+
+      SingletonKeyedWorkItemCoder<K, InputT> workItemCoder = 
SingletonKeyedWorkItemCoder.of(
+          inputKvCoder.getKeyCoder(),
+          inputKvCoder.getValueCoder(),
+          input.getWindowingStrategy().getWindowFn().windowCoder());
+
+
+      WindowedValue.
+          FullWindowedValueCoder<SingletonKeyedWorkItem<K, InputT>> 
windowedWorkItemCoder =
+          WindowedValue.getFullCoder(
+              workItemCoder,
+              input.getWindowingStrategy().getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemTypeInfo =
+          new CoderTypeInformation<>(windowedWorkItemCoder);
+
+      DataStream<WindowedValue<KV<K, InputT>>> inputDataStream = 
context.getInputDataStream(input);
+
+      DataStream<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> 
workItemStream =
+          inputDataStream
+              .flatMap(new ToKeyedWorkItem<K, InputT>())
+              .returns(workItemTypeInfo).name("ToKeyedWorkItem");
+
+      KeyedStream<
+          WindowedValue<
+              SingletonKeyedWorkItem<K, InputT>>, ByteBuffer> 
keyedWorkItemStream = workItemStream
+          .keyBy(new WorkItemKeySelector<K, 
InputT>(inputKvCoder.getKeyCoder()));
+
+      context.setOutputDataStream(context.getOutput(transform), 
keyedWorkItemStream);
+    }
+  }
+
+  private static class FlattenPCollectionTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
+      Flatten.PCollections<T>> {
+
+    @Override
+    public void translateNode(
+        Flatten.PCollections<T> transform,
+        FlinkStreamingTranslationContext context) {
+      Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
+
+      if (allInputs.isEmpty()) {
+
+        // create an empty dummy source to satisfy downstream operations
+        // we cannot create an empty source in Flink, therefore we have to
+        // add the flatMap that simply never forwards the single element
+        DataStreamSource<String> dummySource =
+            context.getExecutionEnvironment().fromElements("dummy");
+
+        DataStream<WindowedValue<T>> result = dummySource.flatMap(
+            new FlatMapFunction<String, WindowedValue<T>>() {
+              @Override
+              public void flatMap(
+                  String s,
+                  Collector<WindowedValue<T>> collector) throws Exception {
+                // never return anything
+              }
+            }).returns(
+            new CoderTypeInformation<>(
+                WindowedValue.getFullCoder(
+                    (Coder<T>) VoidCoder.of(),
+                    GlobalWindow.Coder.INSTANCE)));
+        context.setOutputDataStream(context.getOutput(transform), result);
+
+      } else {
+        DataStream<T> result = null;
+        for (PValue input : allInputs.values()) {
+          DataStream<T> current = context.getInputDataStream(input);
+          result = (result == null) ? current : result.union(current);
+        }
+        context.setOutputDataStream(context.getOutput(transform), result);
+      }
+    }
+  }
+
+  private static class ToKeyedWorkItem<K, InputT>
+      extends RichFlatMapFunction<
+      WindowedValue<KV<K, InputT>>,
+      WindowedValue<SingletonKeyedWorkItem<K, InputT>>> {
+
+    @Override
+    public void flatMap(
+        WindowedValue<KV<K, InputT>> inWithMultipleWindows,
+        Collector<WindowedValue<SingletonKeyedWorkItem<K, InputT>>> out) 
throws Exception {
+
+      // we need to wrap each one work item per window for now
+      // since otherwise the PushbackSideInputRunner will not correctly
+      // determine whether side inputs are ready
+      //
+      // this is tracked as https://issues.apache.org/jira/browse/BEAM-1850
+      for (WindowedValue<KV<K, InputT>> in : 
inWithMultipleWindows.explodeWindows()) {
+        SingletonKeyedWorkItem<K, InputT> workItem =
+            new SingletonKeyedWorkItem<>(
+                in.getValue().getKey(),
+                in.withValue(in.getValue().getValue()));
+
+        out.collect(in.withValue(workItem));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
new file mode 100644
index 0000000..1a943a3
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Helper for keeping track of which {@link DataStream DataStreams} map
+ * to which {@link PTransform PTransforms}.
+ */
+class FlinkStreamingTranslationContext {
+
+  private final StreamExecutionEnvironment env;
+  private final PipelineOptions options;
+
+  /**
+   * Keeps a mapping between the output value of the PTransform (in Dataflow) 
and the
+   * Flink Operator that produced it, after the translation of the 
correspondinf PTransform
+   * to its Flink equivalent.
+   * */
+  private final Map<PValue, DataStream<?>> dataStreams;
+
+  private AppliedPTransform<?, ?, ?> currentTransform;
+
+  public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, 
PipelineOptions options) {
+    this.env = checkNotNull(env);
+    this.options = checkNotNull(options);
+    this.dataStreams = new HashMap<>();
+  }
+
+  public StreamExecutionEnvironment getExecutionEnvironment() {
+    return env;
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> DataStream<T> getInputDataStream(PValue value) {
+    return (DataStream<T>) dataStreams.get(value);
+  }
+
+  public void setOutputDataStream(PValue value, DataStream<?> set) {
+    if (!dataStreams.containsKey(value)) {
+      dataStreams.put(value, set);
+    }
+  }
+
+  /**
+   * Sets the AppliedPTransform which carries input/output.
+   * @param currentTransform
+   */
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) 
{
+    this.currentTransform = currentTransform;
+  }
+
+  public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) {
+    Coder<T> valueCoder = collection.getCoder();
+
+    return WindowedValue.getFullCoder(
+        valueCoder,
+        collection.getWindowingStrategy().getWindowFn().windowCoder());
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> 
collection) {
+    Coder<T> valueCoder = collection.getCoder();
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.getFullCoder(
+            valueCoder,
+            collection.getWindowingStrategy().getWindowFn().windowCoder());
+
+    return new CoderTypeInformation<>(windowedValueCoder);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public <T extends PValue> T getInput(PTransform<T, ?> transform) {
+    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
+  }
+
+  public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, 
?> transform) {
+    return currentTransform.getInputs();
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends PValue> T getOutput(PTransform<?, T> transform) {
+    return (T) 
Iterables.getOnlyElement(currentTransform.getOutputs().values());
+  }
+
+  public <OutputT extends POutput> Map<TupleTag<?>, PValue> getOutputs(
+      PTransform<?, OutputT> transform) {
+    return currentTransform.getOutputs();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
new file mode 100644
index 0000000..f955f2a
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingViewOverrides.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Flink streaming overrides for various view (side input) transforms.
+ */
+class FlinkStreamingViewOverrides {
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Flink runner in streaming mode.
+   */
+  static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    private final transient FlinkRunner runner;
+
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMap(FlinkRunner runner, View.AsMap<K, V> transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * View.AsMultimap View.AsMultimap} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, 
Iterable<V>>>> {
+
+    private final transient FlinkRunner runner;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMultimap(FlinkRunner runner, View.AsMultimap<K, V> 
transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> 
input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<KV<K, V>, Map<K, 
Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsList View.AsList} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsList(FlinkRunner runner, View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link View.AsIterable View.AsIterable} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsIterable(FlinkRunner runner, View.AsIterable<T> 
transform) { }
+
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateFlinkPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Specialized expansion for
+   * {@link View.AsSingleton View.AsSingleton} for the
+   * Flink runner in streaming mode.
+   */
+  static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+    private View.AsSingleton<T> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsSingleton(FlinkRunner runner, View.AsSingleton<T> 
transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), 
transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends 
Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one 
element "
+            + "accessed as a singleton view. Consider using 
Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingCombineGloballyAsSingletonView(
+        FlinkRunner runner,
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined,
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateFlinkPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link 
StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, 
List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used 
as a side input
+   */
+  public static class CreateFlinkPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateFlinkPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateFlinkPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateFlinkPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
new file mode 100644
index 0000000..3acc3ea
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationOptimizer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Traverses the Pipeline to determine the {@link TranslationMode} for this 
pipeline.
+ */
+class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
+
+  private TranslationMode translationMode;
+
+  private final FlinkPipelineOptions options;
+
+  public PipelineTranslationOptimizer(TranslationMode defaultMode, 
FlinkPipelineOptions options) {
+    this.translationMode = defaultMode;
+    this.options = options;
+  }
+
+  public TranslationMode getTranslationMode() {
+
+    // override user-specified translation mode
+    if (options.isStreaming()) {
+      return TranslationMode.STREAMING;
+    }
+
+    return translationMode;
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {}
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    Class<? extends PTransform> transformClass = 
node.getTransform().getClass();
+    if (transformClass == Read.Unbounded.class) {
+      LOG.info("Found {}. Switching to streaming execution.", transformClass);
+      translationMode = TranslationMode.STREAMING;
+    }
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
new file mode 100644
index 0000000..8f50105
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.util.UserCodeException;
+
+/**
+ * Test Flink runner.
+ */
+public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
+
+  private FlinkRunner delegate;
+
+  private TestFlinkRunner(FlinkPipelineOptions options) {
+    // We use [auto] for testing since this will make it pick up the Testing 
ExecutionEnvironment
+    options.setFlinkMaster("[auto]");
+    this.delegate = FlinkRunner.fromOptions(options);
+  }
+
+  public static TestFlinkRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    return new TestFlinkRunner(flinkOptions);
+  }
+
+  public static TestFlinkRunner create(boolean streaming) {
+    FlinkPipelineOptions flinkOptions = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    flinkOptions.setRunner(TestFlinkRunner.class);
+    flinkOptions.setStreaming(streaming);
+    return TestFlinkRunner.fromOptions(flinkOptions);
+  }
+
+  @Override
+  public PipelineResult run(Pipeline pipeline) {
+    try {
+      return delegate.run(pipeline);
+    } catch (Throwable t) {
+      // Special case hack to pull out assertion errors from PAssert; instead 
there should
+      // probably be a better story along the lines of UserCodeException.
+      UserCodeException innermostUserCodeException = null;
+      Throwable current = t;
+      for (; current.getCause() != null; current = current.getCause()) {
+        if (current instanceof UserCodeException) {
+          innermostUserCodeException = ((UserCodeException) current);
+        }
+      }
+      if (innermostUserCodeException != null) {
+        current = innermostUserCodeException.getCause();
+      }
+      if (current instanceof AssertionError) {
+        throw (AssertionError) current;
+      }
+      throw new PipelineExecutionException(current);
+    }
+  }
+
+  public PipelineOptions getPipelineOptions() {
+    return delegate.getPipelineOptions();
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
new file mode 100644
index 0000000..ad54750
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+/**
+ * The translation mode of the Beam Pipeline.
+ */
+enum TranslationMode {
+
+  /** Uses the batch mode of Flink. */
+  BATCH,
+
+  /** Uses the streaming mode of Flink. */
+  STREAMING
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java
new file mode 100644
index 0000000..57f1e59
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
new file mode 100644
index 0000000..fb2493b
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
+import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A {@link AggregatorFactory} for the Flink Batch Runner.
+ */
+public class FlinkAggregatorFactory implements AggregatorFactory{
+
+  private final RuntimeContext runtimeContext;
+
+  public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Override
+  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> 
createAggregatorForDoFn(
+      Class<?> fnClass, ExecutionContext.StepContext stepContext, String 
aggregatorName,
+      Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+    @SuppressWarnings("unchecked")
+    SerializableFnAggregatorWrapper<InputT, OutputT> result =
+        (SerializableFnAggregatorWrapper<InputT, OutputT>)
+            runtimeContext.getAccumulator(aggregatorName);
+
+    if (result == null) {
+      result = new SerializableFnAggregatorWrapper<>(combine);
+      runtimeContext.addAccumulator(aggregatorName, result);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
new file mode 100644
index 0000000..447b1e5
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for
+ * Flink functions.
+ */
+class FlinkAssignContext<InputT, W extends BoundedWindow>
+    extends WindowFn<InputT, W>.AssignContext {
+  private final WindowedValue<InputT> value;
+
+  FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+    fn.super();
+    checkArgument(
+        Iterables.size(value.getWindows()) == 1,
+        String.format(
+            "%s passed to window assignment must be in a single window, but it 
was in %s: %s",
+            WindowedValue.class.getSimpleName(),
+            Iterables.size(value.getWindows()),
+            value.getWindows()));
+    this.value = value;
+  }
+
+  @Override
+  public InputT element() {
+    return value.getValue();
+  }
+
+  @Override
+  public Instant timestamp() {
+    return value.getTimestamp();
+  }
+
+  @Override
+  public BoundedWindow window() {
+    return Iterables.getOnlyElement(value.getWindows());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
new file mode 100644
index 0000000..c3a5095
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Flink {@link FlatMapFunction} for implementing
+ * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ */
+public class FlinkAssignWindows<T, W extends BoundedWindow>
+    implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+  private final WindowFn<T, W> windowFn;
+
+  public FlinkAssignWindows(WindowFn<T, W> windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  @Override
+  public void flatMap(
+      WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws 
Exception {
+    Collection<W> windows = windowFn.assignWindows(new 
FlinkAssignContext<>(windowFn, input));
+    for (W window: windows) {
+      collector.collect(
+          WindowedValue.of(input.getValue(), input.getTimestamp(), window, 
input.getPane()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
new file mode 100644
index 0000000..51582af
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+/**
+ * Encapsulates a {@link DoFn}
+ * inside a Flink {@link 
org.apache.flink.api.common.functions.RichMapPartitionFunction}.
+ *
+ * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to 
output index
+ * and must tag all outputs with the output number. Afterwards a filter will 
filter out
+ * those elements that are not to be in a specific output.
+ */
+public class FlinkDoFnFunction<InputT, OutputT>
+    extends RichMapPartitionFunction<WindowedValue<InputT>, 
WindowedValue<OutputT>> {
+
+  private final SerializedPipelineOptions serializedOptions;
+
+  private final DoFn<InputT, OutputT> doFn;
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  private final WindowingStrategy<?, ?> windowingStrategy;
+
+  private final Map<TupleTag<?>, Integer> outputMap;
+  private final TupleTag<OutputT> mainOutputTag;
+
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  public FlinkDoFnFunction(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions options,
+      Map<TupleTag<?>, Integer> outputMap,
+      TupleTag<OutputT> mainOutputTag) {
+
+    this.doFn = doFn;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+    this.outputMap = outputMap;
+    this.mainOutputTag = mainOutputTag;
+
+  }
+
+  @Override
+  public void mapPartition(
+      Iterable<WindowedValue<InputT>> values,
+      Collector<WindowedValue<OutputT>> out) throws Exception {
+
+    RuntimeContext runtimeContext = getRuntimeContext();
+
+    DoFnRunners.OutputManager outputManager;
+    if (outputMap == null) {
+      outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
+    } else {
+      // it has some additional outputs
+      outputManager =
+          new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, 
outputMap);
+    }
+
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
+        serializedOptions.getPipelineOptions(), doFn,
+        new FlinkSideInputReader(sideInputs, runtimeContext),
+        outputManager,
+        mainOutputTag,
+        // see SimpleDoFnRunner, just use it to limit number of additional 
outputs
+        Collections.<TupleTag<?>>emptyList(),
+        new FlinkNoOpStepContext(),
+        new FlinkAggregatorFactory(runtimeContext),
+        windowingStrategy);
+
+    doFnRunner.startBundle();
+
+    for (WindowedValue<InputT> value : values) {
+      doFnRunner.processElement(value);
+    }
+
+    doFnRunner.finishBundle();
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+  }
+
+  @Override
+  public void close() throws Exception {
+    doFnInvoker.invokeTeardown();
+  }
+
+  static class DoFnOutputManager
+      implements DoFnRunners.OutputManager {
+
+    private Collector collector;
+
+    DoFnOutputManager(Collector collector) {
+      this.collector = collector;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      collector.collect(output);
+    }
+  }
+
+  static class MultiDoFnOutputManager
+      implements DoFnRunners.OutputManager {
+
+    private Collector<WindowedValue<RawUnionValue>> collector;
+    private Map<TupleTag<?>, Integer> outputMap;
+
+    MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector,
+                      Map<TupleTag<?>, Integer> outputMap) {
+      this.collector = collector;
+      this.outputMap = outputMap;
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), 
output.getValue()),
+          output.getTimestamp(), output.getWindows(), output.getPane()));
+    }
+  }
+
+}

Reply via email to