[ 
https://issues.apache.org/jira/browse/BEAM-2930?focusedWorklogId=137935&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137935
 ]

ASF GitHub Bot logged work on BEAM-2930:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Aug/18 18:58
            Start Date: 24/Aug/18 18:58
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6208: [BEAM-2930] Side 
input support for Flink portable streaming.
URL: https://github.com/apache/beam/pull/6208
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index c488522aa64..c453329051d 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -34,22 +34,22 @@
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /** A {@link PCollectionView} created from the components of a {@link 
SideInput}. */
-class RunnerPCollectionView<T> extends PValueBase implements 
PCollectionView<T> {
+public class RunnerPCollectionView<T> extends PValueBase implements 
PCollectionView<T> {
   private final TupleTag<Iterable<WindowedValue<?>>> tag;
   private final ViewFn<Iterable<WindowedValue<?>>, T> viewFn;
   private final WindowMappingFn<?> windowMappingFn;
   private final @Nullable WindowingStrategy<?, ?> windowingStrategy;
-  private final @Nullable Coder<Iterable<WindowedValue<?>>> coder;
-  private final transient PCollection<?> pCollection;
+  private final @Nullable Coder<?> coder;
+  private final transient @Nullable PCollection<?> pCollection;
 
   /** Create a new {@link RunnerPCollectionView} from the provided components. 
*/
-  RunnerPCollectionView(
-      PCollection<?> pCollection,
+  public RunnerPCollectionView(
+      @Nullable PCollection<?> pCollection,
       TupleTag<Iterable<WindowedValue<?>>> tag,
       ViewFn<Iterable<WindowedValue<?>>, T> viewFn,
       WindowMappingFn<?> windowMappingFn,
       @Nullable WindowingStrategy<?, ?> windowingStrategy,
-      @Nullable Coder<Iterable<WindowedValue<?>>> coder) {
+      @Nullable Coder<?> coder) {
     this.pCollection = pCollection;
     this.tag = tag;
     this.viewFn = viewFn;
@@ -84,7 +84,7 @@
   }
 
   @Override
-  public Coder<Iterable<WindowedValue<?>>> getCoderInternal() {
+  public Coder<?> getCoderInternal() {
     return coder;
   }
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index e8c2b66ba42..1a6826d94b1 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -38,7 +38,6 @@
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 
 /**
@@ -148,30 +147,46 @@ public void addSideInputValue(PCollectionView<?> 
sideInput, WindowedValue<Iterab
   @Nullable
   @Override
   public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    @SuppressWarnings("unchecked")
-    Coder<BoundedWindow> windowCoder =
-        (Coder<BoundedWindow>) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
-
-    StateTag<ValueState<Iterable<?>>> stateTag = 
sideInputContentsTags.get(view);
-
-    ValueState<Iterable<?>> state =
-        stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
 
-    // TODO: Add support for choosing which representation is contained based 
upon the
-    // side input materialization. We currently can assume that we always have 
a multimap
-    // materialization as that is the only supported type within the Java SDK.
-    @Nullable Iterable<KV<?, ?>> elements = (Iterable<KV<?, ?>>) state.read();
+    @Nullable Iterable<?> elements = getIterable(view, window);
 
     if (elements == null) {
       elements = Collections.emptyList();
     }
 
+    // TODO: Add support for choosing which representation is contained based 
upon the
+    // side input materialization. We currently can assume that we always have 
a multimap
+    // materialization as that is the only supported type within the Java SDK.
     ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) 
view.getViewFn();
     Coder<?> keyCoder = ((KvCoder<?, ?>) 
view.getCoderInternal()).getKeyCoder();
     return (T)
         viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, 
(Iterable) elements));
   }
 
+  /**
+   * Retrieve the value as written by {@link 
#addSideInputValue(PCollectionView, WindowedValue)},
+   * without applying the SDK specific {@link ViewFn}.
+   *
+   * @param view
+   * @param window
+   * @param <T>
+   * @return
+   */
+  @Nullable
+  public <T> Iterable<?> getIterable(PCollectionView<T> view, BoundedWindow 
window) {
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) 
view.getWindowingStrategyInternal().getWindowFn().windowCoder();
+
+    StateTag<ValueState<Iterable<?>>> stateTag = 
sideInputContentsTags.get(view);
+
+    ValueState<Iterable<?>> state =
+        stateInternals.state(StateNamespaces.window(windowCoder, window), 
stateTag);
+
+    // returns null when the side input is not ready
+    return state.read();
+  }
+
   @Override
   public boolean isReady(PCollectionView<?> sideInput, BoundedWindow window) {
     Set<BoundedWindow> readyWindows =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 63b071a1cdd..41c66a151b4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -149,9 +149,6 @@ public static FlinkBatchPortablePipelineTranslator 
createTranslator() {
     translatorMap.put(
         PTransformTranslation.RESHUFFLE_URN,
         FlinkBatchPortablePipelineTranslator::translateReshuffle);
-    translatorMap.put(
-        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
-        FlinkBatchPortablePipelineTranslator::translateView);
     return new FlinkBatchPortablePipelineTranslator(translatorMap.build());
   }
 
@@ -242,17 +239,6 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
     }
   }
 
-  private static <InputT> void translateView(
-      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
-
-    DataSet<WindowedValue<InputT>> inputDataSet =
-        context.getDataSetOrThrow(
-            
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
-
-    context.addDataSet(
-        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), 
inputDataSet);
-  }
-
   private static <K, V> void translateReshuffle(
       PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
     DataSet<WindowedValue<KV<K, V>>> inputDataSet =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 0a48edb8d6b..8afe49c2521 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -28,6 +28,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -36,6 +37,7 @@
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.RunnerPCollectionView;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
@@ -58,16 +60,22 @@
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.protobuf.v3.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -166,6 +174,7 @@ public StreamExecutionEnvironment getExecutionEnvironment() 
{
         PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, 
this::translateAssignWindows);
     translatorMap.put(ExecutableStage.URN, this::translateExecutableStage);
     translatorMap.put(PTransformTranslation.RESHUFFLE_URN, 
this::translateReshuffle);
+
     this.urnToTransformTranslator = translatorMap.build();
   }
 
@@ -270,6 +279,9 @@ public void flatMap(T t, Collector<T> collector) {
     RunnerApi.PTransform pTransform = 
pipeline.getComponents().getTransformsOrThrow(id);
     String inputPCollectionId = 
Iterables.getOnlyElement(pTransform.getInputsMap().values());
 
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(pipeline.getComponents());
+
     RunnerApi.WindowingStrategy windowingStrategyProto =
         pipeline
             .getComponents()
@@ -279,12 +291,6 @@ public void flatMap(T t, Collector<T> collector) {
                     .getPcollectionsOrThrow(inputPCollectionId)
                     .getWindowingStrategyId());
 
-    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
-        context.getDataStreamOrThrow(inputPCollectionId);
-
-    RehydratedComponents rehydratedComponents =
-        RehydratedComponents.forComponents(pipeline.getComponents());
-
     WindowingStrategy<?, ?> windowingStrategy;
     try {
       windowingStrategy =
@@ -298,6 +304,28 @@ public void flatMap(T t, Collector<T> collector) {
 
     WindowedValueCoder<KV<K, V>> windowedInputCoder =
         (WindowedValueCoder) instantiateCoder(inputPCollectionId, 
pipeline.getComponents());
+
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        context.getDataStreamOrThrow(inputPCollectionId);
+
+    SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> 
outputDataStream =
+        addGBK(
+            inputDataStream,
+            windowingStrategy,
+            windowedInputCoder,
+            pTransform.getUniqueName(),
+            context);
+
+    context.addDataStream(
+        Iterables.getOnlyElement(pTransform.getOutputsMap().values()), 
outputDataStream);
+  }
+
+  private <K, V> SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> 
addGBK(
+      DataStream<WindowedValue<KV<K, V>>> inputDataStream,
+      WindowingStrategy<?, ?> windowingStrategy,
+      WindowedValueCoder<KV<K, V>> windowedInputCoder,
+      String operatorName,
+      StreamingTranslationContext context) {
     KvCoder<K, V> inputElementCoder = (KvCoder<K, V>) 
windowedInputCoder.getValueCoder();
 
     SingletonKeyedWorkItemCoder<K, V> workItemCoder =
@@ -339,11 +367,10 @@ public void flatMap(T t, Collector<T> collector) {
 
     TupleTag<KV<K, Iterable<V>>> mainTag = new TupleTag<>("main output");
 
-    // TODO: remove non-portable operator re-use
     WindowDoFnOperator<K, V, Iterable<V>> doFnOperator =
         new WindowDoFnOperator<K, V, Iterable<V>>(
             reduceFn,
-            pTransform.getUniqueName(),
+            operatorName,
             (Coder) windowedWorkItemCoder,
             mainTag,
             Collections.emptyList(),
@@ -357,10 +384,9 @@ public void flatMap(T t, Collector<T> collector) {
 
     SingleOutputStreamOperator<WindowedValue<KV<K, Iterable<V>>>> 
outputDataStream =
         keyedWorkItemStream.transform(
-            pTransform.getUniqueName(), outputTypeInfo, 
(OneInputStreamOperator) doFnOperator);
+            operatorName, outputTypeInfo, (OneInputStreamOperator) 
doFnOperator);
 
-    context.addDataStream(
-        Iterables.getOnlyElement(pTransform.getOutputsMap().values()), 
outputDataStream);
+    return outputDataStream;
   }
 
   private void translateImpulse(
@@ -434,10 +460,12 @@ private void translateImpulse(
     }
 
     String inputPCollectionId = stagePayload.getInput();
-    // TODO: https://issues.apache.org/jira/browse/BEAM-2930
+    final TransformedSideInputs transformedSideInputs;
+
     if (stagePayload.getSideInputsCount() > 0) {
-      throw new UnsupportedOperationException(
-          "[BEAM-2930] streaming translator does not support side inputs: " + 
transform);
+      transformedSideInputs = transformSideInputs(stagePayload, components, 
context);
+    } else {
+      transformedSideInputs = new 
TransformedSideInputs(Collections.emptyMap(), null);
     }
 
     Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = 
Maps.newLinkedHashMap();
@@ -470,8 +498,6 @@ private void translateImpulse(
     DataStream<WindowedValue<InputT>> inputDataStream =
         context.getDataStreamOrThrow(inputPCollectionId);
 
-    // TODO: coder for side input push back
-    final Coder<WindowedValue<InputT>> windowedInputCoder = null;
     CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation =
         (!outputs.isEmpty())
             ? new CoderTypeInformation(outputCoders.get(mainOutputTag.getId()))
@@ -488,26 +514,33 @@ private void translateImpulse(
         new DoFnOperator.MultiOutputOutputManagerFactory<>(
             mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds);
 
-    // TODO: side inputs
     DoFnOperator<InputT, OutputT> doFnOperator =
         new ExecutableStageDoFnOperator<>(
             transform.getUniqueName(),
-            windowedInputCoder,
+            null,
             null,
             Collections.emptyMap(),
             mainOutputTag,
             additionalOutputTags,
             outputManagerFactory,
-            Collections.emptyMap() /* sideInputTagMapping */,
-            Collections.emptyList() /* sideInputs */,
+            transformedSideInputs.unionTagToView,
+            new ArrayList<>(transformedSideInputs.unionTagToView.values()),
+            getSideInputIdToPCollectionViewMap(stagePayload, components),
             context.getPipelineOptions(),
             stagePayload,
             context.getJobInfo(),
             FlinkExecutableStageContext.batchFactory(),
             collectionIdToTupleTag);
 
-    outputStream =
-        inputDataStream.transform(transform.getUniqueName(), 
outputTypeInformation, doFnOperator);
+    if (transformedSideInputs.unionTagToView.isEmpty()) {
+      outputStream =
+          inputDataStream.transform(transform.getUniqueName(), 
outputTypeInformation, doFnOperator);
+    } else {
+      outputStream =
+          inputDataStream
+              .connect(transformedSideInputs.unionedSideInputs.broadcast())
+              .transform(transform.getUniqueName(), outputTypeInformation, 
doFnOperator);
+    }
 
     if (mainOutputTag != null) {
       context.addDataStream(outputs.get(mainOutputTag.getId()), outputStream);
@@ -520,6 +553,170 @@ private void translateImpulse(
     }
   }
 
+  private static LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>>
+      getSideInputIdToPCollectionViewMap(
+          RunnerApi.ExecutableStagePayload stagePayload, RunnerApi.Components 
components) {
+
+    RehydratedComponents rehydratedComponents = 
RehydratedComponents.forComponents(components);
+
+    LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>> sideInputs =
+        new LinkedHashMap<>();
+    // for PCollectionView compatibility, not used to transform materialization
+    ViewFn<Iterable<WindowedValue<?>>, ?> viewFn =
+        (ViewFn) new 
PCollectionViews.MultimapViewFn<Iterable<WindowedValue<Void>>, Void>();
+
+    for (RunnerApi.ExecutableStagePayload.SideInputId sideInputId :
+        stagePayload.getSideInputsList()) {
+
+      // TODO: local name is unique as long as only one transform with side 
input can be within a stage
+      String sideInputTag = sideInputId.getLocalName();
+      String collectionId =
+          components
+              .getTransformsOrThrow(sideInputId.getTransformId())
+              .getInputsOrThrow(sideInputId.getLocalName());
+      RunnerApi.WindowingStrategy windowingStrategyProto =
+          components.getWindowingStrategiesOrThrow(
+              
components.getPcollectionsOrThrow(collectionId).getWindowingStrategyId());
+
+      final WindowingStrategy<?, ?> windowingStrategy;
+      try {
+        windowingStrategy =
+            WindowingStrategyTranslation.fromProto(windowingStrategyProto, 
rehydratedComponents);
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalStateException(
+            String.format(
+                "Unable to hydrate side input windowing strategy %s.", 
windowingStrategyProto),
+            e);
+      }
+
+      Coder<WindowedValue<Object>> coder = instantiateCoder(collectionId, 
components);
+      // side input materialization via GBK (T -> Iterable<T>)
+      WindowedValueCoder wvCoder = (WindowedValueCoder) coder;
+      coder = 
wvCoder.withValueCoder(IterableCoder.of(wvCoder.getValueCoder()));
+
+      sideInputs.put(
+          sideInputId,
+          new RunnerPCollectionView<>(
+              null,
+              new TupleTag<>(sideInputTag),
+              viewFn,
+              // TODO: support custom mapping fn
+              windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+              windowingStrategy,
+              coder));
+    }
+    return sideInputs;
+  }
+
+  private TransformedSideInputs transformSideInputs(
+      RunnerApi.ExecutableStagePayload stagePayload,
+      RunnerApi.Components components,
+      StreamingTranslationContext context) {
+
+    LinkedHashMap<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>> sideInputs =
+        getSideInputIdToPCollectionViewMap(stagePayload, components);
+
+    Map<TupleTag<?>, Integer> tagToIntMapping = new HashMap<>();
+    Map<Integer, PCollectionView<?>> intToViewMapping = new HashMap<>();
+    List<WindowedValueCoder<KV<Void, Object>>> kvCoders = new ArrayList<>();
+    List<Coder<?>> viewCoders = new ArrayList<>();
+
+    int count = 0;
+    for (Map.Entry<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>> sideInput :
+        sideInputs.entrySet()) {
+      TupleTag<?> tag = sideInput.getValue().getTagInternal();
+      intToViewMapping.put(count, sideInput.getValue());
+      tagToIntMapping.put(tag, count);
+      count++;
+      String collectionId =
+          components
+              .getTransformsOrThrow(sideInput.getKey().getTransformId())
+              .getInputsOrThrow(sideInput.getKey().getLocalName());
+      DataStream<Object> sideInputStream = 
context.getDataStreamOrThrow(collectionId);
+      TypeInformation<Object> tpe = sideInputStream.getType();
+      if (!(tpe instanceof CoderTypeInformation)) {
+        throw new IllegalStateException("Input Stream TypeInformation is no 
CoderTypeInformation.");
+      }
+
+      WindowedValueCoder<Object> coder =
+          (WindowedValueCoder) ((CoderTypeInformation) tpe).getCoder();
+      Coder<KV<Void, Object>> kvCoder = KvCoder.of(VoidCoder.of(), 
coder.getValueCoder());
+      kvCoders.add(coder.withValueCoder(kvCoder));
+      // coder for materialized view matching GBK below
+      WindowedValueCoder<KV<Void, Iterable<Object>>> viewCoder =
+          coder.withValueCoder(KvCoder.of(VoidCoder.of(), 
IterableCoder.of(coder.getValueCoder())));
+      viewCoders.add(viewCoder);
+    }
+
+    // second pass, now that we gathered the input coders
+    UnionCoder unionCoder = UnionCoder.of(viewCoders);
+
+    CoderTypeInformation<RawUnionValue> unionTypeInformation =
+        new CoderTypeInformation<>(unionCoder);
+
+    // transform each side input to RawUnionValue and union them
+    DataStream<RawUnionValue> sideInputUnion = null;
+
+    for (Map.Entry<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>> sideInput :
+        sideInputs.entrySet()) {
+      TupleTag<?> tag = sideInput.getValue().getTagInternal();
+      final int intTag = tagToIntMapping.get(tag);
+      String collectionId =
+          components
+              .getTransformsOrThrow(sideInput.getKey().getTransformId())
+              .getInputsOrThrow(sideInput.getKey().getLocalName());
+      DataStream<WindowedValue<?>> sideInputStream = 
context.getDataStreamOrThrow(collectionId);
+
+      // insert GBK to materialize side input view
+      String viewName =
+          sideInput.getKey().getTransformId() + "-" + 
sideInput.getKey().getLocalName();
+      WindowedValueCoder<KV<Void, Object>> kvCoder = kvCoders.get(intTag);
+      DataStream<WindowedValue<KV<Void, Object>>> keyedSideInputStream =
+          sideInputStream.map(new ToVoidKeyValue());
+
+      SingleOutputStreamOperator<WindowedValue<KV<Void, Iterable<Object>>>> 
viewStream =
+          addGBK(
+              keyedSideInputStream,
+              sideInput.getValue().getWindowingStrategyInternal(),
+              kvCoder,
+              viewName,
+              context);
+
+      DataStream<RawUnionValue> unionValueStream =
+          viewStream
+              .map(new FlinkStreamingTransformTranslators.ToRawUnion<>(intTag))
+              .returns(unionTypeInformation);
+
+      if (sideInputUnion == null) {
+        sideInputUnion = unionValueStream;
+      } else {
+        sideInputUnion = sideInputUnion.union(unionValueStream);
+      }
+    }
+
+    return new TransformedSideInputs(intToViewMapping, sideInputUnion);
+  }
+
+  private static class TransformedSideInputs {
+    final Map<Integer, PCollectionView<?>> unionTagToView;
+    final DataStream<RawUnionValue> unionedSideInputs;
+
+    TransformedSideInputs(
+        Map<Integer, PCollectionView<?>> unionTagToView,
+        DataStream<RawUnionValue> unionedSideInputs) {
+      this.unionTagToView = unionTagToView;
+      this.unionedSideInputs = unionedSideInputs;
+    }
+  }
+
+  private static class ToVoidKeyValue<T>
+      implements MapFunction<WindowedValue<T>, WindowedValue<KV<Void, T>>> {
+    @Override
+    public WindowedValue<KV<Void, T>> map(WindowedValue<T> value) {
+      return value.withValue(KV.of(null, value.getValue()));
+    }
+  }
+
   static <T> Coder<WindowedValue<T>> instantiateCoder(
       String collectionId, RunnerApi.Components components) {
     PipelineNode.PCollectionNode collectionNode =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 7564f291537..ce8063813fd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -323,7 +323,7 @@ public void translateNode(
   }
 
   /** Wraps each element in a {@link RawUnionValue} with the given tag id. */
-  private static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
+  public static class ToRawUnion<T> implements MapFunction<T, RawUnionValue> {
     private final int intTag;
 
     public ToRawUnion(int intTag) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/.DS_Store
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/.DS_Store
new file mode 100644
index 00000000000..454b32d8285
Binary files /dev/null and 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/.DS_Store
 differ
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
index 5ba04c60739..d66dd5009dc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchExecutableStageContext.java
@@ -51,8 +51,7 @@ public StageBundleFactory 
getStageBundleFactory(ExecutableStage executableStage)
     return jobBundleFactory.forStage(executableStage);
   }
 
-  @Override
-  public StateRequestHandler getStateRequestHandler(
+  public static StateRequestHandler getStateRequestHandler(
       ExecutableStage executableStage, RuntimeContext runtimeContext) {
     SideInputHandlerFactory sideInputHandlerFactory =
         FlinkBatchSideInputHandlerFactory.forStage(executableStage, 
runtimeContext);
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
index df7d040bc58..b912b2a7c4f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
@@ -161,14 +161,6 @@ private FlinkBatchSideInputHandlerFactory(
     return new MultimapSideInputHandler(multimap.build(), keyCoder, 
valueCoder, windowCoder);
   }
 
-  private <T> List<WindowedValue<T>> getBroadcastVariable(String transformId, 
String sideInputId) {
-    PCollectionNode collectionNode =
-        sideInputToCollection.get(
-            
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
-    checkArgument(collectionNode != null, "No side input for %s/%s", 
transformId, sideInputId);
-    return runtimeContext.getBroadcastVariable(collectionNode.getId());
-  }
-
   private static class MultimapSideInputHandler<K, V, W extends BoundedWindow>
       implements SideInputHandler<V, W> {
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
index ba59b534742..0dcaee8687f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -21,8 +21,6 @@
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
-import org.apache.flink.api.common.functions.RuntimeContext;
 
 /** The Flink context required in order to execute {@link ExecutableStage 
stages}. */
 public interface FlinkExecutableStageContext extends AutoCloseable {
@@ -42,7 +40,4 @@ static Factory batchFactory() {
   }
 
   StageBundleFactory getStageBundleFactory(ExecutableStage executableStage);
-
-  StateRequestHandler getStateRequestHandler(
-      ExecutableStage executableStage, RuntimeContext runtimeContext);
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index c501274d0b0..31ea5ec2426 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -93,7 +93,8 @@ public void open(Configuration parameters) throws Exception {
     // NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
     // same backing runtime context and broadcast variables. We use checkState 
below to catch errors
     // in backward-incompatible Flink changes.
-    stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
runtimeContext);
+    stateRequestHandler =
+        
FlinkBatchExecutableStageContext.getStateRequestHandler(executableStage, 
runtimeContext);
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     progressHandler = BundleProgressHandler.unsupported();
   }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
new file mode 100644
index 00000000000..7ac5ae3ca2e
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStreamingSideInputHandlerFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * {@link StateRequestHandler} that uses {@link 
org.apache.beam.runners.core.SideInputHandler} to
+ * access the Flink broadcast state that represents side inputs.
+ */
+public class FlinkStreamingSideInputHandlerFactory implements 
SideInputHandlerFactory {
+
+  // Map from side input id to global PCollection id.
+  private final Map<SideInputId, PCollectionView<?>> sideInputToCollection;
+  private final org.apache.beam.runners.core.SideInputHandler runnerHandler;
+
+  /**
+   * Creates a new state handler for the given stage. Note that this requires 
a traversal of the
+   * stage itself, so this should only be called once per stage rather than 
once per bundle.
+   */
+  public static FlinkStreamingSideInputHandlerFactory forStage(
+      ExecutableStage stage,
+      Map<SideInputId, PCollectionView<?>> viewMapping,
+      org.apache.beam.runners.core.SideInputHandler runnerHandler) {
+    ImmutableMap.Builder<SideInputId, PCollectionView<?>> sideInputBuilder = 
ImmutableMap.builder();
+    for (SideInputReference sideInput : stage.getSideInputs()) {
+      SideInputId sideInputId =
+          SideInputId.newBuilder()
+              .setTransformId(sideInput.transform().getId())
+              .setLocalName(sideInput.localName())
+              .build();
+      sideInputBuilder.put(
+          sideInputId,
+          checkNotNull(
+              viewMapping.get(sideInputId),
+              "No side input for %s/%s",
+              sideInputId.getTransformId(),
+              sideInputId.getLocalName()));
+    }
+
+    FlinkStreamingSideInputHandlerFactory factory =
+        new FlinkStreamingSideInputHandlerFactory(sideInputBuilder.build(), 
runnerHandler);
+    return factory;
+  }
+
+  private FlinkStreamingSideInputHandlerFactory(
+      Map<SideInputId, PCollectionView<?>> sideInputToCollection,
+      org.apache.beam.runners.core.SideInputHandler runnerHandler) {
+    this.sideInputToCollection = sideInputToCollection;
+    this.runnerHandler = runnerHandler;
+  }
+
+  @Override
+  public <T, V, W extends BoundedWindow> SideInputHandler<V, W> forSideInput(
+      String transformId,
+      String sideInputId,
+      RunnerApi.FunctionSpec accessPattern,
+      Coder<T> elementCoder,
+      Coder<W> windowCoder) {
+
+    PCollectionView collectionNode =
+        sideInputToCollection.get(
+            
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
+    checkArgument(collectionNode != null, "No side input for %s/%s", 
transformId, sideInputId);
+
+    if 
(PTransformTranslation.ITERABLE_SIDE_INPUT.equals(accessPattern.getUrn())) {
+      @SuppressWarnings("unchecked") // T == V
+      Coder<V> outputCoder = (Coder<V>) elementCoder;
+      return forIterableSideInput(collectionNode, outputCoder);
+    } else if 
(PTransformTranslation.MULTIMAP_SIDE_INPUT.equals(accessPattern.getUrn())
+        || 
Materializations.MULTIMAP_MATERIALIZATION_URN.equals(accessPattern.getUrn())) {
+      // TODO: Remove non standard URN.
+      // Using non standard version of multimap urn as dataflow uses the non 
standard urn.
+      @SuppressWarnings("unchecked") // T == KV<?, V>
+      KvCoder<?, V> kvCoder = (KvCoder<?, V>) elementCoder;
+      return forMultimapSideInput(collectionNode, kvCoder.getKeyCoder(), 
kvCoder.getValueCoder());
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Unknown side input access pattern: %s", 
accessPattern));
+    }
+  }
+
+  private <T, W extends BoundedWindow> SideInputHandler<T, W> 
forIterableSideInput(
+      PCollectionView<?> collection, Coder<T> elementCoder) {
+
+    return new SideInputHandler<T, W>() {
+      @Override
+      public Iterable<T> get(byte[] key, W window) {
+        return checkNotNull(
+            (Iterable<T>) runnerHandler.getIterable(collection, window),
+            "Element processed by SDK before side input is ready");
+      }
+
+      @Override
+      public Coder<T> resultCoder() {
+        return elementCoder;
+      }
+    };
+  }
+
+  private <K, V, W extends BoundedWindow> SideInputHandler<V, W> 
forMultimapSideInput(
+      PCollectionView<?> collection, Coder<K> keyCoder, Coder<V> valueCoder) {
+
+    return new SideInputHandler<V, W>() {
+      @Override
+      public Iterable<V> get(byte[] key, W window) {
+        Iterable<KV<K, V>> values =
+            (Iterable<KV<K, V>>) runnerHandler.getIterable(collection, window);
+        ArrayList<V> result = new ArrayList<>();
+        // find values for the given key
+        for (KV<K, V> kv : values) {
+          ByteArrayOutputStream bos = new ByteArrayOutputStream();
+          try {
+            keyCoder.encode(kv.getKey(), bos);
+            if (Arrays.equals(key, bos.toByteArray())) {
+              result.add(kv.getValue());
+            }
+          } catch (IOException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+        return result;
+      }
+
+      @Override
+      public Coder<V> resultCoder() {
+        return valueCoder;
+      }
+    };
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 5121038010c..565e0b9771b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -29,10 +29,8 @@
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
 import org.apache.beam.sdk.fn.function.ThrowingFunction;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -178,12 +176,6 @@ public StageBundleFactory 
getStageBundleFactory(ExecutableStage executableStage)
       return context.getStageBundleFactory(executableStage);
     }
 
-    @Override
-    public StateRequestHandler getStateRequestHandler(
-        ExecutableStage executableStage, RuntimeContext runtimeContext) {
-      return context.getStateRequestHandler(executableStage, runtimeContext);
-    }
-
     @Override
     public void close() {
       // Just schedule the context as we want to reuse it if possible.
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1bfe830a5ef..3a92808eddd 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -475,6 +475,22 @@ public final void 
processElement1(StreamRecord<WindowedValue<InputT>> streamReco
     checkInvokeFinishBundleByCount();
   }
 
+  /**
+   * Add the side input value. Here we are assuming that views have already 
been materialized and
+   * are sent over the wire as {@link Iterable}. Subclasses may elect to 
perform materialization in
+   * state and receive side input incrementally instead.
+   *
+   * @param streamRecord
+   */
+  protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
+    @SuppressWarnings("unchecked")
+    WindowedValue<Iterable<?>> value =
+        (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
+
+    PCollectionView<?> sideInput = 
sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+    sideInputHandler.addSideInputValue(sideInput, value);
+  }
+
   @Override
   public final void processElement2(StreamRecord<RawUnionValue> streamRecord) 
throws Exception {
     // we finish the bundle because the newly arrived side-input might
@@ -484,12 +500,8 @@ public final void 
processElement2(StreamRecord<RawUnionValue> streamRecord) thro
     invokeFinishBundle();
     checkInvokeStartBundle();
 
-    @SuppressWarnings("unchecked")
-    WindowedValue<Iterable<?>> value =
-        (WindowedValue<Iterable<?>>) streamRecord.getValue().getValue();
-
-    PCollectionView<?> sideInput = 
sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
-    sideInputHandler.addSideInputValue(sideInput, value);
+    // add the side input, which may cause pushed back elements become 
eligible for processing
+    addSideInputValue(streamRecord);
 
     List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index c0e3858658a..62f9176b3b4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -28,23 +31,28 @@
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,12 +76,14 @@
   private final JobInfo jobInfo;
   private final FlinkExecutableStageContext.Factory contextFactory;
   private final Map<String, TupleTag<?>> outputMap;
+  private final Map<RunnerApi.ExecutableStagePayload.SideInputId, 
PCollectionView<?>> sideInputIds;
 
   private transient FlinkExecutableStageContext stageContext;
   private transient StateRequestHandler stateRequestHandler;
   private transient BundleProgressHandler progressHandler;
   private transient StageBundleFactory stageBundleFactory;
   private transient LinkedBlockingQueue<KV<String, OutputT>> outputQueue;
+  private transient ExecutableStage executableStage;
 
   public ExecutableStageDoFnOperator(
       String stepName,
@@ -85,6 +95,7 @@ public ExecutableStageDoFnOperator(
       OutputManagerFactory<OutputT> outputManagerFactory,
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
+      Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> 
sideInputIds,
       PipelineOptions options,
       RunnerApi.ExecutableStagePayload payload,
       JobInfo jobInfo,
@@ -109,28 +120,46 @@ public ExecutableStageDoFnOperator(
     this.jobInfo = jobInfo;
     this.contextFactory = contextFactory;
     this.outputMap = outputMap;
+    this.sideInputIds = sideInputIds;
   }
 
   @Override
   public void open() throws Exception {
     super.open();
 
-    ExecutableStage executableStage = ExecutableStage.fromPayload(payload);
+    executableStage = ExecutableStage.fromPayload(payload);
     // TODO: Wire this into the distributed cache and make it pluggable.
     // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
     // It's a little strange because this operator is responsible for the 
lifetime of the stage
     // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
     // ownership of the higher level "factories" explicit? Do we care?
     stageContext = contextFactory.get(jobInfo);
-    // NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
-    // same backing runtime context and broadcast variables. We use checkState 
below to catch errors
-    // in backward-incompatible Flink changes.
-    stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
getRuntimeContext());
+
+    stateRequestHandler = getStateRequestHandler(executableStage);
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     progressHandler = BundleProgressHandler.unsupported();
     outputQueue = new LinkedBlockingQueue<>();
   }
 
+  private StateRequestHandler getStateRequestHandler(ExecutableStage 
executableStage) {
+
+    if (executableStage.getSideInputs().size() > 0) {
+      checkNotNull(super.sideInputHandler);
+      StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory =
+          Preconditions.checkNotNull(
+              FlinkStreamingSideInputHandlerFactory.forStage(
+                  executableStage, sideInputIds, super.sideInputHandler));
+      try {
+        return StateRequestHandlers.forSideInputHandlerFactory(
+            ProcessBundleDescriptors.getSideInputs(executableStage), 
sideInputHandlerFactory);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return StateRequestHandler.unsupported();
+    }
+  }
+
   // TODO: currently assumes that every element is a separate bundle,
   // but this can be changed by pushing some of this logic into the 
"DoFnRunner"
   private void processElementWithSdkHarness(WindowedValue<InputT> element) 
throws Exception {
@@ -174,6 +203,15 @@ public void close() throws Exception {
     super.close();
   }
 
+  @Override
+  protected void addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
+    @SuppressWarnings("unchecked")
+    WindowedValue<KV<Void, Iterable<?>>> value =
+        (WindowedValue<KV<Void, Iterable<?>>>) 
streamRecord.getValue().getValue();
+    PCollectionView<?> sideInput = 
sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
+    sideInputHandler.addSideInputValue(sideInput, 
value.withValue(value.getValue().getValue()));
+  }
+
   // TODO: remove single element bundle assumption
   @Override
   protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index dba1c05cee5..86d25411cd2 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -71,6 +71,7 @@
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
 
 /** Tests for {@link ExecutableStageDoFnOperator}. */
 @RunWith(JUnit4.class)
@@ -100,7 +101,6 @@
   public void setUpMocks() {
     MockitoAnnotations.initMocks(this);
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
-    when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
     
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
   }
 
@@ -328,6 +328,7 @@ public void testSerialization() {
             outputManagerFactory,
             Collections.emptyMap() /* sideInputTagMapping */,
             Collections.emptyList() /* sideInputs */,
+            Collections.emptyMap() /* sideInputId mapping */,
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
             stagePayload,
             jobInfo,
@@ -364,12 +365,14 @@ public void testSerialization() {
             outputManagerFactory,
             Collections.emptyMap() /* sideInputTagMapping */,
             Collections.emptyList() /* sideInputs */,
+            Collections.emptyMap() /* sideInputId mapping */,
             PipelineOptionsFactory.as(FlinkPipelineOptions.class),
             stagePayload,
             jobInfo,
             contextFactory,
             createOutputMap(mainOutput, additionalOutputs));
 
+    Whitebox.setInternalState(operator, "stateRequestHandler", 
stateRequestHandler);
     return operator;
   }
 
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index 1f261f5a59b..f6e4ba26b35 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -55,6 +55,7 @@
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
 
 /** Tests for {@link FlinkExecutableStageFunction}. */
 @RunWith(JUnit4.class)
@@ -85,7 +86,6 @@
   public void setUpMocks() {
     MockitoAnnotations.initMocks(this);
     when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
-    when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
     
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
   }
 
@@ -223,8 +223,9 @@ public void testStageBundleClosed() throws Exception {
         Mockito.mock(FlinkExecutableStageContext.Factory.class);
     when(contextFactory.get(any())).thenReturn(stageContext);
     FlinkExecutableStageFunction<Integer> function =
-        new FlinkExecutableStageFunction<Integer>(stagePayload, jobInfo, 
outputMap, contextFactory);
+        new FlinkExecutableStageFunction<>(stagePayload, jobInfo, outputMap, 
contextFactory);
     function.setRuntimeContext(runtimeContext);
+    Whitebox.setInternalState(function, "stateRequestHandler", 
stateRequestHandler);
     return function;
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137935)
    Time Spent: 7h 20m  (was: 7h 10m)

> Flink support for portable side input
> -------------------------------------
>
>                 Key: BEAM-2930
>                 URL: https://issues.apache.org/jira/browse/BEAM-2930
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Henning Rohde
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability
>          Time Spent: 7h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to