[ 
https://issues.apache.org/jira/browse/BEAM-4285?focusedWorklogId=113981&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113981
 ]

ASF GitHub Bot logged work on BEAM-4285:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/18 00:19
            Start Date: 21/Jun/18 00:19
    Worklog Time Spent: 10m 
      Work Description: asfgit closed pull request #5688: [BEAM-4285] Implement 
Flink batch side input handler
URL: https://github.com/apache/beam/pull/5688
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
index 278e5c6cc07..ee77a873e74 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
@@ -30,7 +30,7 @@
 /** An {@link ExecutableStage} which is constructed with all of its initial 
state. */
 @AutoValue
 public abstract class ImmutableExecutableStage implements ExecutableStage {
-  static ImmutableExecutableStage ofFullComponents(
+  public static ImmutableExecutableStage ofFullComponents(
       Components components,
       Environment environment,
       PCollectionNode input,
@@ -49,7 +49,7 @@ static ImmutableExecutableStage ofFullComponents(
     return of(prunedComponents, environment, input, sideInputs, transforms, 
outputs);
   }
 
-  static ImmutableExecutableStage of(
+  public static ImmutableExecutableStage of(
       Components components,
       Environment environment,
       PCollectionNode input,
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
index 7a040573b42..6d5c9736dc1 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/BatchFlinkExecutableStageContext.java
@@ -21,16 +21,23 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalNotification;
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.DockerJobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandlerFactory;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+// TODO: Rename this to FlinkBatchExecutableStageContext for consistency.
 /** Implementation of a {@link FlinkExecutableStageContext} for batch jobs. */
 class BatchFlinkExecutableStageContext implements FlinkExecutableStageContext {
   private static final Logger LOG = 
LoggerFactory.getLogger(BatchFlinkExecutableStageContext.class);
@@ -54,7 +61,26 @@ private BatchFlinkExecutableStageContext(JobBundleFactory 
jobBundleFactory) {
   @Override
   public StateRequestHandler getStateRequestHandler(
       ExecutableStage executableStage, RuntimeContext runtimeContext) {
-    return FlinkBatchStateRequestHandler.forStage(executableStage, 
runtimeContext);
+    MultimapSideInputHandlerFactory sideInputHandlerFactory =
+        FlinkBatchSideInputHandlerFactory.forStage(executableStage, 
runtimeContext);
+    ExecutableProcessBundleDescriptor processBundleDescriptor;
+    try {
+      // NOTE: We require an executable bundle descriptor for the 
StateRequestHandlers construction
+      // below. This only uses the bundle descriptor for side input specs and 
effectively ignores
+      // data and state endpoints. We rely on the fact that PCollections and 
coders are structurally
+      // identical between instantiations here to prevent having to wire the 
original executable
+      // bundle descriptor here. The correct long-term fix is to move side 
input logic out of
+      // ExecutableProcessBundleDescriptor and into ExecutableStage.
+      processBundleDescriptor =
+          ProcessBundleDescriptors.fromExecutableStage(
+              "id", executableStage, 
Endpoints.ApiServiceDescriptor.getDefaultInstance());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    StateRequestHandler stateRequestHandler =
+        StateRequestHandlers.forMultimapSideInputHandlerFactory(
+            processBundleDescriptor, sideInputHandlerFactory);
+    return stateRequestHandler;
   }
 
   private void cleanUp() throws Exception {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
new file mode 100644
index 00000000000..c07647c6be9
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandlerFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * {@link StateRequestHandler} that uses a Flink {@link RuntimeContext} to 
access Flink broadcast
+ * variable that represent side inputs.
+ */
+class FlinkBatchSideInputHandlerFactory implements 
MultimapSideInputHandlerFactory {
+
+  // Map from side input id to global PCollection id.
+  private final Map<SideInputId, PCollectionNode> sideInputToCollection;
+  private final RuntimeContext runtimeContext;
+
+  /**
+   * Creates a new state handler for the given stage. Note that this requires 
a traversal of the
+   * stage itself, so this should only be called once per stage rather than 
once per bundle.
+   */
+  static FlinkBatchSideInputHandlerFactory forStage(
+      ExecutableStage stage, RuntimeContext runtimeContext) {
+    ImmutableMap.Builder<SideInputId, PCollectionNode> sideInputBuilder = 
ImmutableMap.builder();
+    for (SideInputReference sideInput : stage.getSideInputs()) {
+      sideInputBuilder.put(
+          SideInputId.newBuilder()
+              .setTransformId(sideInput.transform().getId())
+              .setLocalName(sideInput.localName())
+              .build(),
+          sideInput.collection());
+    }
+    return new FlinkBatchSideInputHandlerFactory(sideInputBuilder.build(), 
runtimeContext);
+  }
+
+  private FlinkBatchSideInputHandlerFactory(
+      Map<SideInputId, PCollectionNode> sideInputToCollection, RuntimeContext 
runtimeContext) {
+    this.sideInputToCollection = sideInputToCollection;
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Override
+  public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> 
forSideInput(
+      String transformId,
+      String sideInputId,
+      Coder<K> keyCoder,
+      Coder<V> valueCoder,
+      Coder<W> windowCoder) {
+    PCollectionNode collectionNode =
+        sideInputToCollection.get(
+            
SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
+    checkArgument(collectionNode != null, "No side input for %s/%s", 
transformId, sideInputId);
+    List<WindowedValue<KV<K, V>>> broadcastVariable =
+        runtimeContext.getBroadcastVariable(collectionNode.getId());
+
+    ImmutableMultimap.Builder<SideInputKey<K, W>, V> multimap = 
ImmutableMultimap.builder();
+    for (WindowedValue<KV<K, V>> windowedValue : broadcastVariable) {
+      K key = windowedValue.getValue().getKey();
+      V value = windowedValue.getValue().getValue();
+
+      for (BoundedWindow boundedWindow : windowedValue.getWindows()) {
+        @SuppressWarnings("unchecked")
+        W window = (W) boundedWindow;
+        multimap.put(SideInputKey.of(key, window), value);
+      }
+    }
+
+    return new SideInputHandler<>(multimap.build());
+  }
+
+  private static class SideInputHandler<K, V, W extends BoundedWindow>
+      implements MultimapSideInputHandler<K, V, W> {
+
+    private final Multimap<SideInputKey<K, W>, V> collection;
+
+    private SideInputHandler(Multimap<SideInputKey<K, W>, V> collection) {
+      this.collection = collection;
+    }
+
+    @Override
+    public Iterable<V> get(K key, W window) {
+      return collection.get(SideInputKey.of(key, window));
+    }
+  }
+
+  @AutoValue
+  abstract static class SideInputKey<K, W extends BoundedWindow> {
+    static <K, W extends BoundedWindow> SideInputKey<K, W> of(K key, W window) 
{
+      return new 
AutoValue_FlinkBatchSideInputHandlerFactory_SideInputKey<>(key, window);
+    }
+
+    @Nullable
+    abstract K key();
+
+    abstract W window();
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java
deleted file mode 100644
index cdf06204c33..00000000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.concurrent.CompletionStage;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse.Builder;
-import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-// TODO: https://issues.apache.org/jira/browse/BEAM-4285 Implement batch state 
handler.
-/** State request handler for flink batch runner. */
-public class FlinkBatchStateRequestHandler implements StateRequestHandler {
-
-  private FlinkBatchStateRequestHandler() {}
-
-  public static FlinkBatchStateRequestHandler forStage(
-      ExecutableStage stage, RuntimeContext runtimeContext) {
-    return new FlinkBatchStateRequestHandler();
-  }
-
-  @Override
-  public CompletionStage<Builder> handle(StateRequest request) throws 
Exception {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
new file mode 100644
index 00000000000..f3afd0b5d7d
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchSideInputHandlerFactoryTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
+import 
org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link FlinkBatchSideInputHandlerFactory}. */
+@RunWith(JUnit4.class)
+public class FlinkBatchSideInputHandlerFactoryTest {
+
+  private static final String TRANSFORM_ID = "transform-id";
+  private static final String SIDE_INPUT_NAME = "side-input";
+  private static final String COLLECTION_ID = "collection";
+  private static final ExecutableStage EXECUTABLE_STAGE =
+      createExecutableStage(
+          Arrays.asList(
+              SideInputReference.of(
+                  PipelineNode.pTransform(TRANSFORM_ID, 
RunnerApi.PTransform.getDefaultInstance()),
+                  SIDE_INPUT_NAME,
+                  PipelineNode.pCollection(
+                      COLLECTION_ID, 
RunnerApi.PCollection.getDefaultInstance()))));
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private RuntimeContext context;
+
+  @Before
+  public void setUpMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void invalidSideInputThrowsException() {
+    ExecutableStage stage = createExecutableStage(Collections.emptyList());
+    FlinkBatchSideInputHandlerFactory factory =
+        FlinkBatchSideInputHandlerFactory.forStage(stage, context);
+    thrown.expect(instanceOf(IllegalArgumentException.class));
+    factory.forSideInput(
+        "transform-id", "side-input", VoidCoder.of(), VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE);
+  }
+
+  @Test
+  public void emptyResultForEmptyCollection() {
+    FlinkBatchSideInputHandlerFactory factory =
+        FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
+    MultimapSideInputHandler<Void, Integer, GlobalWindow> handler =
+        factory.forSideInput(
+            TRANSFORM_ID,
+            SIDE_INPUT_NAME,
+            VoidCoder.of(),
+            VarIntCoder.of(),
+            GlobalWindow.Coder.INSTANCE);
+    // We never populated the broadcast variable for "side-input", so the mock 
will return an empty
+    // list.
+    Iterable<Integer> result = handler.get(null, GlobalWindow.INSTANCE);
+    assertThat(result, emptyIterable());
+  }
+
+  @Test
+  public void singleElementForCollection() {
+    when(context.getBroadcastVariable(COLLECTION_ID))
+        .thenReturn(
+            Arrays.asList(WindowedValue.valueInGlobalWindow(KV.<Void, 
Integer>of(null, 3))));
+
+    FlinkBatchSideInputHandlerFactory factory =
+        FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
+    MultimapSideInputHandler<Void, Integer, GlobalWindow> handler =
+        factory.forSideInput(
+            TRANSFORM_ID,
+            SIDE_INPUT_NAME,
+            VoidCoder.of(),
+            VarIntCoder.of(),
+            GlobalWindow.Coder.INSTANCE);
+    Iterable<Integer> result = handler.get(null, GlobalWindow.INSTANCE);
+    assertThat(result, contains(3));
+  }
+
+  @Test
+  public void groupsValuesByKey() {
+    when(context.getBroadcastVariable(COLLECTION_ID))
+        .thenReturn(
+            Arrays.asList(
+                WindowedValue.valueInGlobalWindow(KV.of("foo", 2)),
+                WindowedValue.valueInGlobalWindow(KV.of("bar", 3)),
+                WindowedValue.valueInGlobalWindow(KV.of("foo", 5))));
+
+    FlinkBatchSideInputHandlerFactory factory =
+        FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
+    MultimapSideInputHandler<String, Integer, GlobalWindow> handler =
+        factory.forSideInput(
+            TRANSFORM_ID,
+            SIDE_INPUT_NAME,
+            StringUtf8Coder.of(),
+            VarIntCoder.of(),
+            GlobalWindow.Coder.INSTANCE);
+    Iterable<Integer> result = handler.get("foo", GlobalWindow.INSTANCE);
+    assertThat(result, containsInAnyOrder(2, 5));
+  }
+
+  @Test
+  public void groupsValuesByWindowAndKey() {
+    Instant instantA = new DateTime(2018, 1, 1, 1, 1, 
DateTimeZone.UTC).toInstant();
+    Instant instantB = new DateTime(2018, 1, 1, 1, 2, 
DateTimeZone.UTC).toInstant();
+    Instant instantC = new DateTime(2018, 1, 1, 1, 3, 
DateTimeZone.UTC).toInstant();
+    IntervalWindow windowA = new IntervalWindow(instantA, instantB);
+    IntervalWindow windowB = new IntervalWindow(instantB, instantC);
+    when(context.getBroadcastVariable(COLLECTION_ID))
+        .thenReturn(
+            Arrays.asList(
+                WindowedValue.of(KV.of("foo", 1), instantA, windowA, 
PaneInfo.NO_FIRING),
+                WindowedValue.of(KV.of("bar", 2), instantA, windowA, 
PaneInfo.NO_FIRING),
+                WindowedValue.of(KV.of("foo", 3), instantA, windowA, 
PaneInfo.NO_FIRING),
+                WindowedValue.of(KV.of("foo", 4), instantB, windowB, 
PaneInfo.NO_FIRING),
+                WindowedValue.of(KV.of("bar", 5), instantB, windowB, 
PaneInfo.NO_FIRING),
+                WindowedValue.of(KV.of("foo", 6), instantB, windowB, 
PaneInfo.NO_FIRING)));
+
+    FlinkBatchSideInputHandlerFactory factory =
+        FlinkBatchSideInputHandlerFactory.forStage(EXECUTABLE_STAGE, context);
+    MultimapSideInputHandler<String, Integer, IntervalWindow> handler =
+        factory.forSideInput(
+            TRANSFORM_ID,
+            SIDE_INPUT_NAME,
+            StringUtf8Coder.of(),
+            VarIntCoder.of(),
+            IntervalWindowCoder.of());
+    Iterable<Integer> resultA = handler.get("foo", windowA);
+    Iterable<Integer> resultB = handler.get("foo", windowB);
+    assertThat(resultA, containsInAnyOrder(1, 3));
+    assertThat(resultB, containsInAnyOrder(4, 6));
+  }
+
+  private static ExecutableStage 
createExecutableStage(Collection<SideInputReference> sideInputs) {
+    Components components = Components.getDefaultInstance();
+    Environment environment = Environment.getDefaultInstance();
+    PCollectionNode inputCollection =
+        PipelineNode.pCollection("collection-id", 
RunnerApi.PCollection.getDefaultInstance());
+    return ImmutableExecutableStage.of(
+        components,
+        environment,
+        inputCollection,
+        sideInputs,
+        Collections.emptyList(),
+        Collections.emptyList());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 113981)
    Time Spent: 2h  (was: 1h 50m)

> Flink batch state request handler
> ---------------------------------
>
>                 Key: BEAM-4285
>                 URL: https://issues.apache.org/jira/browse/BEAM-4285
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> In order to support side inputs Flink needs a state service request handler. 
> As in the non-portable we can start by handling batch side inputs by Flink 
> broadcast variables.
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
>  or 
> [https://github.com/bsidhom/beam/blob/41de3bce60f1ebc9211f299612a20d8e561f9b6f/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkBatchStateRequestHandler.java]
>  can be used as a starting point. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to