[ 
https://issues.apache.org/jira/browse/BEAM-2597?focusedWorklogId=101323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101323
 ]

ASF GitHub Bot logged work on BEAM-2597:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/18 22:16
            Start Date: 11/May/18 22:16
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5285: [BEAM-2597] Flink batch 
ExecutableStage operator
URL: https://github.com/apache/beam/pull/5285
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
new file mode 100644
index 00000000000..3881a0b365a
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/ArtifactSourcePool.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.Manifest;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+
+/**
+ * A pool of {@link ArtifactSource ArtifactSources} that can be used as a 
single source. At least
+ * one source must be registered before artifacts can be requested from it.
+ *
+ * <p>Artifact pooling is required for Flink operators that use the 
DistributedCache for artifact
+ * distribution. This is because distributed caches (along with other runtime 
context) are scoped to
+ * operator lifetimes but the artifact retrieval service must outlive the any 
remote environments it
+ * serves. Remote environments cannot be shared between jobs and are thus 
job-scoped.
+ *
+ * <p>Because of the peculiarities of artifact pooling and Flink, this class 
is packaged with the
+ * Flink runner rather than as a core fn-execution utility.
+ */
+@ThreadSafe
+public class ArtifactSourcePool implements ArtifactSource {
+
+  private ArtifactSourcePool() {}
+
+  /**
+   * Adds a new cache to the pool. When the returned {@link AutoCloseable} is 
closed, the given
+   * cache will be removed from the pool.
+   */
+  public AutoCloseable addToPool(ArtifactSource artifactSource) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Manifest getManifest() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void getArtifact(String name, StreamObserver<ArtifactChunk> 
responseObserver) {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 01c93629046..0c8ed5d610b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -49,6 +49,7 @@
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
@@ -56,6 +57,7 @@
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvKeySelector;
 import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -63,6 +65,7 @@
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -112,10 +115,17 @@
    * Creates a batch translation context. The resulting Flink execution dag 
will live in a new
    * {@link ExecutionEnvironment}.
    */
-  public static BatchTranslationContext 
createTranslationContext(FlinkPipelineOptions options) {
+  public static BatchTranslationContext createTranslationContext(JobInfo 
jobInfo) {
+    PipelineOptions pipelineOptions;
+    try {
+      pipelineOptions = 
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     ExecutionEnvironment executionEnvironment =
-        FlinkExecutionEnvironments.createBatchExecutionEnvironment(options);
-    return new BatchTranslationContext(options, executionEnvironment);
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+            pipelineOptions.as(FlinkPipelineOptions.class));
+    return new BatchTranslationContext(jobInfo, executionEnvironment);
   }
 
   /** Creates a batch translator. */
@@ -152,22 +162,21 @@ public static FlinkBatchPortablePipelineTranslator 
createTranslator() {
   public static class BatchTranslationContext
       implements FlinkPortablePipelineTranslator.TranslationContext {
 
-    private final FlinkPipelineOptions options;
+    private final JobInfo jobInfo;
     private final ExecutionEnvironment executionEnvironment;
     private final Map<String, DataSet<?>> dataSets;
     private final Set<String> danglingDataSets;
 
-    private BatchTranslationContext(
-        FlinkPipelineOptions options, ExecutionEnvironment 
executionEnvironment) {
-      this.options = options;
+    private BatchTranslationContext(JobInfo jobInfo, ExecutionEnvironment 
executionEnvironment) {
+      this.jobInfo = jobInfo;
       this.executionEnvironment = executionEnvironment;
       dataSets = new HashMap<>();
       danglingDataSets = new HashSet<>();
     }
 
     @Override
-    public FlinkPipelineOptions getPipelineOptions() {
-      return options;
+    public JobInfo getJobInfo() {
+      return jobInfo;
     }
 
     public ExecutionEnvironment getExecutionEnvironment() {
@@ -338,8 +347,9 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
     FlinkExecutableStageFunction<InputT> function =
         new FlinkExecutableStageFunction<>(
             stagePayload,
-            PipelineOptionsTranslation.toProto(context.getPipelineOptions()),
-            outputMap);
+            context.getJobInfo(),
+            outputMap,
+            FlinkExecutableStageContext.batchFactory());
 
     DataSet<WindowedValue<InputT>> inputDataSet =
         context.getDataSetOrThrow(stagePayload.getInput());
@@ -475,13 +485,18 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
     Grouping<WindowedValue<KV<K, V>>> inputGrouping =
         inputDataSet.groupBy(new 
KvKeySelector<>(inputElementCoder.getKeyCoder()));
 
+    PipelineOptions options;
+    try {
+      options = 
PipelineOptionsTranslation.fromProto(context.getJobInfo().pipelineOptions());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     FlinkPartialReduceFunction<K, V, List<V>, ?> partialReduceFunction =
         new FlinkPartialReduceFunction<>(
-            combineFn, windowingStrategy, Collections.emptyMap(), 
context.getPipelineOptions());
+            combineFn, windowingStrategy, Collections.emptyMap(), options);
 
     FlinkReduceFunction<K, List<V>, List<V>, ?> reduceFunction =
-        new FlinkReduceFunction<>(
-            combineFn, windowingStrategy, Collections.emptyMap(), 
context.getPipelineOptions());
+        new FlinkReduceFunction<>(combineFn, windowingStrategy, 
Collections.emptyMap(), options);
 
     // Partially GroupReduce the values into the intermediate format AccumT 
(combine)
     GroupCombineOperator<WindowedValue<KV<K, V>>, WindowedValue<KV<K, 
List<V>>>> groupCombine =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
index d8a56c9badc..45ee3cca68a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.flink;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 
 /**
  * Interface for portable Flink translators. This allows for a uniform 
invocation pattern for
@@ -32,7 +33,7 @@
 
   /** The context used for pipeline translation. */
   interface TranslationContext {
-    FlinkPipelineOptions getPipelineOptions();
+    JobInfo getJobInfo();
   }
 
   /** Translates the given pipeline. */
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
new file mode 100644
index 00000000000..0fd4961f699
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageContext.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.io.Serializable;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/** The Flink context required in order to execute {@link ExecutableStage 
stages}. */
+public class FlinkExecutableStageContext {
+
+  /**
+   * Creates {@link FlinkExecutableStageContext} instances. Serializable so 
that factories can be
+   * defined at translation time and distributed to TaskManagers.
+   */
+  public interface Factory extends Serializable {
+    FlinkExecutableStageContext get(JobInfo jobInfo);
+  }
+
+  public static Factory batchFactory() {
+    return null;
+  }
+
+  public StageBundleFactory getStageBundleFactory(ExecutableStage 
executableStage) {
+    throw new UnsupportedOperationException();
+  }
+
+  public StateRequestHandler getStateRequestHandler(
+      ExecutableStage executableStage, RuntimeContext runtimeContext) {
+    throw new UnsupportedOperationException();
+  }
+
+  public ArtifactSourcePool getArtifactSourcePool() {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index ec29c2606f1..3189fcd3d05 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -17,12 +17,25 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import com.google.protobuf.Struct;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
@@ -38,36 +51,112 @@
 public class FlinkExecutableStageFunction<InputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, RawUnionValue> {
 
+  // Main constructor fields. All must be Serializable because Flink 
distributes Functions to
+  // task managers via java serialization.
+
   // The executable stage this function will run.
   private final RunnerApi.ExecutableStagePayload stagePayload;
   // Pipeline options. Used for provisioning api.
-  private final Struct pipelineOptions;
+  private final JobInfo jobInfo;
   // Map from PCollection id to the union tag used to represent this 
PCollection in the output.
   private final Map<String, Integer> outputMap;
+  private final FlinkExecutableStageContext.Factory contextFactory;
+
+  // Worker-local fields. These should only be constructed and consumed on 
Flink TaskManagers.
+  private transient RuntimeContext runtimeContext;
+  private transient StateRequestHandler stateRequestHandler;
+  private transient StageBundleFactory stageBundleFactory;
+  private transient AutoCloseable distributedCacheCloser;
 
   public FlinkExecutableStageFunction(
       RunnerApi.ExecutableStagePayload stagePayload,
-      Struct pipelineOptions,
-      Map<String, Integer> outputMap) {
+      JobInfo jobInfo,
+      Map<String, Integer> outputMap,
+      FlinkExecutableStageContext.Factory contextFactory) {
     this.stagePayload = stagePayload;
-    this.pipelineOptions = pipelineOptions;
+    this.jobInfo = jobInfo;
     this.outputMap = outputMap;
+    this.contextFactory = contextFactory;
   }
 
   @Override
   public void open(Configuration parameters) throws Exception {
-    throw new UnsupportedOperationException();
+    ExecutableStage executableStage = 
ExecutableStage.fromPayload(stagePayload);
+    runtimeContext = getRuntimeContext();
+    // TODO: Wire this into the distributed cache and make it pluggable.
+    ArtifactSource artifactSource = null;
+    // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
+    // It's a little strange because this operator is responsible for the 
lifetime of the stage
+    // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
+    // ownership of the higher level "factories" explicit? Do we care?
+    FlinkExecutableStageContext stageContext = contextFactory.get(jobInfo);
+    ArtifactSourcePool cachePool = stageContext.getArtifactSourcePool();
+    distributedCacheCloser = cachePool.addToPool(artifactSource);
+    // NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
+    // same backing runtime context and broadcast variables. We use checkState 
below to catch errors
+    // in backward-incompatible Flink changes.
+    stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
runtimeContext);
+    stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
   }
 
   @Override
   public void mapPartition(
       Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> 
collector)
       throws Exception {
-    throw new UnsupportedOperationException();
+    checkState(
+        runtimeContext == getRuntimeContext(),
+        "RuntimeContext changed from under us. State handler invalid.");
+    checkState(
+        stageBundleFactory != null, "%s not yet prepared", 
StageBundleFactory.class.getName());
+    checkState(
+        stateRequestHandler != null, "%s not yet prepared", 
StateRequestHandler.class.getName());
+
+    try (RemoteBundle<InputT> bundle =
+        stageBundleFactory.getBundle(
+            new ReceiverFactory(collector, outputMap), stateRequestHandler)) {
+      FnDataReceiver<WindowedValue<InputT>> receiver = 
bundle.getInputReceiver();
+      for (WindowedValue<InputT> input : iterable) {
+        receiver.accept(input);
+      }
+    }
+    // NOTE: RemoteBundle.close() blocks on completion of all data receivers. 
This is necessary to
+    // safely reference the partition-scoped Collector from receivers.
   }
 
   @Override
   public void close() throws Exception {
-    throw new UnsupportedOperationException();
+    try (AutoCloseable cacheCloser = distributedCacheCloser;
+        AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
+  }
+
+  /**
+   * Receiver factory that wraps outgoing elements with the corresponding 
union tag for a
+   * multiplexed PCollection.
+   */
+  private static class ReceiverFactory implements OutputReceiverFactory {
+
+    private final Object collectorLock = new Object();
+
+    @GuardedBy("collectorLock")
+    private final Collector<RawUnionValue> collector;
+
+    private final Map<String, Integer> outputMap;
+
+    ReceiverFactory(Collector<RawUnionValue> collector, Map<String, Integer> 
outputMap) {
+      this.collector = collector;
+      this.outputMap = outputMap;
+    }
+
+    @Override
+    public <OutputT> FnDataReceiver<OutputT> create(String collectionId) {
+      Integer unionTag = outputMap.get(collectionId);
+      checkArgument(unionTag != null, "Unknown PCollection id: %s", 
collectionId);
+      int tagInt = unionTag;
+      return (receivedElement) -> {
+        synchronized (collectorLock) {
+          collector.collect(new RawUnionValue(tagInt, receivedElement));
+        }
+      };
+    }
   }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
new file mode 100644
index 00000000000..4b5b5cdcfae
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link FlinkExecutableStageFunction}. */
+@RunWith(JUnit4.class)
+public class FlinkExecutableStageFunctionTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private RuntimeContext runtimeContext;
+  @Mock private DistributedCache distributedCache;
+  @Mock private Collector<RawUnionValue> collector;
+  @Mock private FlinkExecutableStageContext stageContext;
+  @Mock private StageBundleFactory stageBundleFactory;
+  @Mock private ArtifactSourcePool artifactSourcePool;
+  @Mock private StateRequestHandler stateRequestHandler;
+
+  // NOTE: ExecutableStage.fromPayload expects exactly one input, so we 
provide one here. These unit
+  // tests in general ignore the executable stage itself and mock around it.
+  private final ExecutableStagePayload stagePayload =
+      ExecutableStagePayload.newBuilder()
+          .setInput("input")
+          .setComponents(
+              Components.newBuilder()
+                  .putPcollections("input", PCollection.getDefaultInstance())
+                  .build())
+          .build();
+  private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", 
Struct.getDefaultInstance());
+
+  @Before
+  public void setUpMocks() {
+    MockitoAnnotations.initMocks(this);
+    when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
+    when(stageContext.getArtifactSourcePool()).thenReturn(artifactSourcePool);
+    when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
+    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+  }
+
+  @Test
+  public void sdkErrorsSurfaceOnClose() throws Exception {
+    FlinkExecutableStageFunction<Integer> function = 
getFunction(Collections.emptyMap());
+    function.open(new Configuration());
+
+    @SuppressWarnings("unchecked")
+    RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+    when(stageBundleFactory.<Integer>getBundle(any(), 
any())).thenReturn(bundle);
+
+    @SuppressWarnings("unchecked")
+    FnDataReceiver<WindowedValue<Integer>> receiver = 
Mockito.mock(FnDataReceiver.class);
+    when(bundle.getInputReceiver()).thenReturn(receiver);
+
+    Exception expected = new Exception();
+    doThrow(expected).when(bundle).close();
+    thrown.expect(is(expected));
+    function.mapPartition(Collections.emptyList(), collector);
+  }
+
+  @Test
+  public void checksForRuntimeContextChanges() throws Exception {
+    FlinkExecutableStageFunction<Integer> function = 
getFunction(Collections.emptyMap());
+    function.open(new Configuration());
+    // Change runtime context.
+    function.setRuntimeContext(Mockito.mock(RuntimeContext.class));
+    thrown.expect(Matchers.instanceOf(IllegalStateException.class));
+    function.mapPartition(Collections.emptyList(), collector);
+  }
+
+  @Test
+  public void expectedInputsAreSent() throws Exception {
+    FlinkExecutableStageFunction<Integer> function = 
getFunction(Collections.emptyMap());
+    function.open(new Configuration());
+
+    @SuppressWarnings("unchecked")
+    RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+    when(stageBundleFactory.<Integer>getBundle(any(), 
any())).thenReturn(bundle);
+
+    @SuppressWarnings("unchecked")
+    FnDataReceiver<WindowedValue<Integer>> receiver = 
Mockito.mock(FnDataReceiver.class);
+    when(bundle.getInputReceiver()).thenReturn(receiver);
+
+    WindowedValue<Integer> one = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Integer> two = WindowedValue.valueInGlobalWindow(2);
+    WindowedValue<Integer> three = WindowedValue.valueInGlobalWindow(3);
+    function.mapPartition(Arrays.asList(one, two, three), collector);
+
+    verify(receiver).accept(one);
+    verify(receiver).accept(two);
+    verify(receiver).accept(three);
+    verifyNoMoreInteractions(receiver);
+  }
+
+  @Test
+  public void outputsAreTaggedCorrectly() throws Exception {
+    WindowedValue<Integer> three = WindowedValue.valueInGlobalWindow(3);
+    WindowedValue<Integer> four = WindowedValue.valueInGlobalWindow(4);
+    WindowedValue<Integer> five = WindowedValue.valueInGlobalWindow(5);
+    Map<String, Integer> outputTagMap =
+        ImmutableMap.of(
+            "one", 1,
+            "two", 2,
+            "three", 3);
+
+    // We use a real StageBundleFactory here in order to exercise the output 
receiver factory.
+    StageBundleFactory stageBundleFactory =
+        new StageBundleFactory() {
+          @Override
+          public <InputT> RemoteBundle<InputT> getBundle(
+              OutputReceiverFactory receiverFactory, StateRequestHandler 
stateRequestHandler)
+              throws Exception {
+            return new RemoteBundle<InputT>() {
+              @Override
+              public String getId() {
+                return "bundle-id";
+              }
+
+              @Override
+              public FnDataReceiver<WindowedValue<InputT>> getInputReceiver() {
+                return new FnDataReceiver<WindowedValue<InputT>>() {
+                  @Override
+                  public void accept(WindowedValue<InputT> input) throws 
Exception {
+                    // Ignore input
+                  }
+                };
+              }
+
+              @Override
+              public void close() throws Exception {
+                // Emit all values to the runner when the bundle is closed.
+                receiverFactory.create("one").accept(three);
+                receiverFactory.create("two").accept(four);
+                receiverFactory.create("three").accept(five);
+              }
+            };
+          }
+
+          @Override
+          public void close() throws Exception {}
+        };
+    // Wire the stage bundle factory into our context.
+    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+
+    FlinkExecutableStageFunction<Integer> function = getFunction(outputTagMap);
+    function.open(new Configuration());
+
+    function.mapPartition(Collections.emptyList(), collector);
+    // Ensure that the tagged values sent to the collector have the correct 
union tags as specified
+    // in the output map.
+    verify(collector).collect(new RawUnionValue(1, three));
+    verify(collector).collect(new RawUnionValue(2, four));
+    verify(collector).collect(new RawUnionValue(3, five));
+    verifyNoMoreInteractions(collector);
+  }
+
+  @Test
+  public void testStageBundleClosed() throws Exception {
+    FlinkExecutableStageFunction<Integer> function = 
getFunction(Collections.emptyMap());
+    function.open(new Configuration());
+    function.close();
+    verify(stageBundleFactory).close();
+    verifyNoMoreInteractions(stageBundleFactory);
+  }
+
+  /**
+   * Creates a {@link FlinkExecutableStageFunction}. Sets the runtime context 
to {@link
+   * #runtimeContext}. The context factory is mocked to return {@link 
#stageContext} every time. The
+   * behavior of the stage context itself is unchanged.
+   */
+  private FlinkExecutableStageFunction<Integer> getFunction(Map<String, 
Integer> outputMap) {
+    FlinkExecutableStageContext.Factory contextFactory =
+        Mockito.mock(FlinkExecutableStageContext.Factory.class);
+    when(contextFactory.get(any())).thenReturn(stageContext);
+    FlinkExecutableStageFunction<Integer> function =
+        new FlinkExecutableStageFunction<Integer>(stagePayload, jobInfo, 
outputMap, contextFactory);
+    function.setRuntimeContext(runtimeContext);
+    return function;
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java
index ae818ca0726..9325fcf5f5f 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/OutputReceiverFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.fnexecution.control;
 
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
 /**
  * A factory that can create output receivers during an executable stage.
  */
@@ -25,7 +27,7 @@
   /**
    * Get a new {@link RemoteOutputReceiver} for an output PCollection.
    *
-   * This call should only be invoked once per PCollectionId per factory.
+   * <p>This call should only be invoked once per PCollection id per factory.
    */
-  <OutputT> RemoteOutputReceiver<OutputT> create(String pCollectionId);
+  <OutputT> FnDataReceiver<OutputT> create(String pCollectionId);
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/provisioning/JobInfo.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/provisioning/JobInfo.java
index 0c1f8b94181..374ffeb89ed 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/provisioning/JobInfo.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/provisioning/JobInfo.java
@@ -20,13 +20,14 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.protobuf.Struct;
+import java.io.Serializable;
 
 /**
  * A subset of {@link 
org.apache.beam.model.fnexecution.v1.ProvisionApi.ProvisionInfo} that
  * specifies a unique job, while omitting fields that are not known to the 
runner operator.
  */
 @AutoValue
-public abstract class JobInfo {
+public abstract class JobInfo implements Serializable {
   public static JobInfo create(String jobId, String jobName, Struct 
pipelineOptions) {
     return new AutoValue_JobInfo(jobId, jobName, pipelineOptions);
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101323)
    Time Spent: 4h 10m  (was: 4h)

> FlinkRunner ExecutableStage batch operator
> ------------------------------------------
>
>                 Key: BEAM-2597
>                 URL: https://issues.apache.org/jira/browse/BEAM-2597
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Kenneth Knowles
>            Assignee: Ben Sidhom
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> This operator will execute user code in the context of an SDK harness by 
> constructing a ProcessBundleDescriptor from an ExecutableStage (physical 
> stage plan) and sending instructions/elements over the control and data 
> planes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to