[ 
https://issues.apache.org/jira/browse/BEAM-4297?focusedWorklogId=106770&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106770
 ]

ASF GitHub Bot logged work on BEAM-4297:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/May/18 19:02
            Start Date: 29/May/18 19:02
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #5407: [BEAM-4297] Streaming 
executable stage translation and operator for portable Flink runner.
URL: https://github.com/apache/beam/pull/5407
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 72fe10e22c0..b7ee08378dc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -20,7 +20,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -56,6 +55,7 @@
 import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
@@ -319,7 +319,8 @@ public void translate(BatchTranslationContext context, 
RunnerApi.Pipeline pipeli
     RunnerApi.Components components = pipeline.getComponents();
     Map<String, String> outputs = transform.getTransform().getOutputsMap();
     // Mapping from PCollection id to coder tag id.
-    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+    BiMap<String, Integer> outputMap =
+            FlinkPipelineTranslatorUtils.createOutputMap(outputs.values());
     // Collect all output Coders and create a UnionCoder for our tagged 
outputs.
     List<Coder<?>> unionCoders = Lists.newArrayList();
     // Enforce tuple tag sorting by union tag index.
@@ -615,14 +616,4 @@ private static void pruneOutput(
     context.addDataSet(collectionId, pruningOperator);
   }
 
-  /**  Creates a mapping from PCollection id to output tag integer. */
-  private static BiMap<String, Integer> createOutputMap(Iterable<String> 
localOutputs) {
-    ImmutableBiMap.Builder<String, Integer> builder = ImmutableBiMap.builder();
-    int outputIndex = 0;
-    for (String tag : localOutputs) {
-      builder.put(tag, outputIndex);
-      outputIndex++;
-    }
-    return builder.build();
-  }
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index b7bc8ca1a40..ff991dce7ed 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -17,15 +17,20 @@
  */
 package org.apache.beam.runners.flink;
 
+import com.google.common.collect.BiMap;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -36,8 +41,11 @@
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
@@ -67,6 +75,7 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
 
 /**
  * Translate an unbounded portable pipeline representation into a Flink 
pipeline representation.
@@ -423,8 +432,99 @@ private void translateImpulse(
           String id,
           RunnerApi.Pipeline pipeline,
           StreamingTranslationContext context) {
+    // TODO: Fail on stateful DoFns for now.
+    // TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+    // TODO: Fail on splittable DoFns.
+    // TODO: Special-case single outputs to avoid multiplexing PCollections.
+    RunnerApi.Components components = pipeline.getComponents();
+    RunnerApi.PTransform transform = components.getTransformsOrThrow(id);
+    Map<String, String> outputs = transform.getOutputsMap();
+
+    final RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload = 
RunnerApi.ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    String inputPCollectionId =
+            Iterables.getOnlyElement(transform.getInputsMap().values());
+
+    Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = 
Maps.newLinkedHashMap();
+    Map<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders = 
Maps.newLinkedHashMap();
+    // TODO: does it matter which output we designate as "main"
+    TupleTag<OutputT> mainOutputTag;
+    if (!outputs.isEmpty()) {
+      mainOutputTag = new TupleTag(outputs.keySet().iterator().next());
+    } else {
+      mainOutputTag = null;
+    }
+
+    // associate output tags with ids, output manager uses these Integer ids 
to serialize state
+    BiMap<String, Integer> outputIndexMap =
+            FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+    Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap();
+    Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
+    // order output names for deterministic mapping
+    for (String localOutputName : new TreeMap<>(outputIndexMap).keySet()) {
+      String collectionId = outputs.get(localOutputName);
+      Coder<WindowedValue<?>> windowCoder = (Coder) 
instantiateCoder(collectionId, components);
+      outputCoders.put(localOutputName, windowCoder);
+      TupleTag<?> tupleTag = new TupleTag<>(localOutputName);
+      CoderTypeInformation<WindowedValue<?>> typeInformation =
+              new CoderTypeInformation(windowCoder);
+      tagsToOutputTags.put(tupleTag, new OutputTag<>(localOutputName, 
typeInformation));
+      tagsToCoders.put(tupleTag, windowCoder);
+      tagsToIds.put(tupleTag, outputIndexMap.get(localOutputName));
+    }
+
+    final SingleOutputStreamOperator<WindowedValue<OutputT>> outputStream;
+    DataStream<WindowedValue<InputT>> inputDataStream = 
context.getDataStreamOrThrow(
+            inputPCollectionId);
+
+    // TODO: coder for side input push back
+    final Coder<WindowedValue<InputT>> inputCoder = null;
+    CoderTypeInformation<WindowedValue<OutputT>> outputTypeInformation = 
(!outputs.isEmpty())
+            ? new 
CoderTypeInformation(outputCoders.get(mainOutputTag.getId())) : null;
+
+    ArrayList<TupleTag<?>> additionalOutputTags = Lists.newArrayList();
+    for (TupleTag<?> tupleTag : tagsToCoders.keySet()) {
+      if (!mainOutputTag.getId().equals(tupleTag.getId())) {
+        additionalOutputTags.add(tupleTag);
+      }
+    }
 
-    throw new RuntimeException("executable stage translation not implemented");
+    DoFnOperator.MultiOutputOutputManagerFactory<OutputT> outputManagerFactory 
=
+            new DoFnOperator.MultiOutputOutputManagerFactory<>(
+                    mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds);
+
+    // TODO: side inputs
+    DoFnOperator<InputT, OutputT> doFnOperator =
+            new ExecutableStageDoFnOperator<InputT, OutputT>(
+                    transform.getUniqueName(),
+                    inputCoder,
+                    mainOutputTag,
+                    additionalOutputTags,
+                    outputManagerFactory,
+                    Collections.emptyMap() /* sideInputTagMapping */,
+                    Collections.emptyList() /* sideInputs */,
+                    context.getPipelineOptions(),
+                    stagePayload,
+                    context.getJobInfo(),
+                    FlinkExecutableStageContext.batchFactory()
+            );
+
+    outputStream = inputDataStream
+            .transform(transform.getUniqueName(), outputTypeInformation, 
doFnOperator);
+
+    if (mainOutputTag != null) {
+      context.addDataStream(outputs.get(mainOutputTag.getId()), outputStream);
+    }
+
+    for (TupleTag<?> tupleTag : additionalOutputTags) {
+      context.addDataStream(outputs.get(tupleTag.getId()),
+              outputStream.getSideOutput(tagsToOutputTags.get(tupleTag)));
+    }
 
   }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
new file mode 100644
index 00000000000..ebd40ecb67f
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.utils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.Sets;
+
+/**
+ * Utilities for pipeline translation.
+ */
+public final class FlinkPipelineTranslatorUtils {
+  private FlinkPipelineTranslatorUtils() {}
+
+  /**  Creates a mapping from PCollection id to output tag integer. */
+  public static BiMap<String, Integer> createOutputMap(Iterable<String> 
localOutputs) {
+    ImmutableBiMap.Builder<String, Integer> builder = ImmutableBiMap.builder();
+    int outputIndex = 0;
+    // sort localOutputs for stable indexing
+    for (String tag : Sets.newTreeSet(localOutputs)) {
+      builder.put(tag, outputIndex);
+      outputIndex++;
+    }
+    return builder.build();
+  }
+
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
new file mode 100644
index 00000000000..fc84e4259b3
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+
+/**
+ * ExecutableStageDoFnOperator basic functional implementation without side 
inputs and user state.
+ * SDK harness interaction code adopted from
+ * {@link 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction}.
+ * TODO: Evaluate reuse
+ * All operators in the non-portable streaming translation are based on {@link 
DoFnOperator}.
+ * This implies dependency on {@link DoFnRunner}, which is not required for 
portable pipeline.
+ * TODO: Multiple element bundle execution
+ * The operator (like old non-portable runner) executes every element as 
separate bundle,
+ * which will be even more expensive with SDK harness container.
+ * Refactor for above should be looked into once streaming side inputs (and 
push back) take
+ * shape.
+ * @param <InputT>
+ * @param <OutputT>
+ */
+public class ExecutableStageDoFnOperator<InputT, OutputT> extends 
DoFnOperator<InputT, OutputT> {
+
+  private static final Logger logger =
+          Logger.getLogger(ExecutableStageDoFnOperator.class.getName());
+
+  private final RunnerApi.ExecutableStagePayload payload;
+  private final JobInfo jobInfo;
+  private final FlinkExecutableStageContext.Factory contextFactory;
+  private final Map<String, TupleTag<?>> outputMap;
+
+  private transient StateRequestHandler stateRequestHandler;
+  private transient StageBundleFactory stageBundleFactory;
+  private transient AutoCloseable distributedCacheCloser;
+
+  public ExecutableStageDoFnOperator(String stepName,
+                                     Coder<WindowedValue<InputT>> inputCoder,
+                                     TupleTag<OutputT> mainOutputTag,
+                                     List<TupleTag<?>> additionalOutputTags,
+                                     OutputManagerFactory<OutputT> 
outputManagerFactory,
+                                     Map<Integer, PCollectionView<?>> 
sideInputTagMapping,
+                                     Collection<PCollectionView<?>> sideInputs,
+                                     PipelineOptions options,
+                                     RunnerApi.ExecutableStagePayload payload,
+                                     JobInfo jobInfo,
+                                     FlinkExecutableStageContext.Factory 
contextFactory
+                                     ) {
+    super(new NoOpDoFn(),
+            stepName, inputCoder, mainOutputTag, additionalOutputTags,
+            outputManagerFactory, WindowingStrategy.globalDefault() /* unused 
*/,
+            sideInputTagMapping, sideInputs, options, null /*keyCoder*/);
+      this.payload = payload;
+      this.jobInfo = jobInfo;
+      this.contextFactory = contextFactory;
+      this.outputMap = createOutputMap(mainOutputTag, additionalOutputTags);
+  }
+
+  private static Map<String, TupleTag<?>> createOutputMap(TupleTag mainOutput,
+                                                          List<TupleTag<?>> 
additionalOutputs) {
+      Map<String, TupleTag<?>> outputMap = new 
HashMap<>(additionalOutputs.size() + 1);
+      if (mainOutput != null) {
+        outputMap.put(mainOutput.getId(), mainOutput);
+      }
+      for (TupleTag<?> additionalTag : additionalOutputs) {
+        outputMap.put(additionalTag.getId(), additionalTag);
+      }
+      return outputMap;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+
+    ExecutableStage executableStage = ExecutableStage.fromPayload(payload);
+    // TODO: Wire this into the distributed cache and make it pluggable.
+    ArtifactSource artifactSource = null;
+    // TODO: Do we really want this layer of indirection when accessing the 
stage bundle factory?
+    // It's a little strange because this operator is responsible for the 
lifetime of the stage
+    // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
+    // ownership of the higher level "factories" explicit? Do we care?
+    FlinkExecutableStageContext stageContext = contextFactory.get(jobInfo);
+    ArtifactSourcePool cachePool = stageContext.getArtifactSourcePool();
+    distributedCacheCloser = cachePool.addToPool(artifactSource);
+    // NOTE: It's safe to reuse the state handler between partitions because 
each partition uses the
+    // same backing runtime context and broadcast variables. We use checkState 
below to catch errors
+    // in backward-incompatible Flink changes.
+    stateRequestHandler = stageContext.getStateRequestHandler(executableStage, 
getRuntimeContext());
+    stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
+
+  }
+
+  // TODO: currently assumes that every element is a separate bundle,
+  // but this can be changed by pushing some of this logic into the 
"DoFnRunner"
+  private void processElementWithSdkHarness(WindowedValue<InputT> element) 
throws Exception {
+    checkState(stageBundleFactory != null, "%s not yet prepared",
+            StageBundleFactory.class.getName());
+    checkState(stateRequestHandler != null, "%s not yet prepared",
+            StateRequestHandler.class.getName());
+
+    try (RemoteBundle<InputT> bundle =
+                 stageBundleFactory.getBundle(
+                         new ReceiverFactory(outputManager, outputMap), 
stateRequestHandler)) {
+      logger.finer(String.format("Sending value: %s", element));
+      bundle.getInputReceiver().accept(element);
+    }
+
+  }
+
+  @Override
+  public void close() throws Exception {
+    try (AutoCloseable cacheCloser = distributedCacheCloser;
+         AutoCloseable bundleFactoryCloser = stageBundleFactory) {}
+    super.close();
+  }
+
+  // TODO: remove single element bundle assumption
+  @Override
+  protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(
+          DoFnRunner<InputT, OutputT> wrappedRunner) {
+    return new SdkHarnessDoFnRunner();
+  }
+
+  private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> {
+    @Override
+    public void startBundle() {}
+
+    @Override
+    public void processElement(WindowedValue<InputT> elem) {
+      try {
+        processElementWithSdkHarness(elem);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void onTimer(String timerId, BoundedWindow window, Instant 
timestamp,
+                        TimeDomain timeDomain) {
+    }
+
+    @Override
+    public void finishBundle() {}
+
+    @Override
+    public DoFn<InputT, OutputT> getFn() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> 
{
+    @ProcessElement
+    public void doNothing(ProcessContext context) {}
+  }
+
+  /**
+   * Receiver factory that wraps outgoing elements with the corresponding 
union tag for a
+   * multiplexed PCollection.
+   */
+  private static class ReceiverFactory implements OutputReceiverFactory {
+
+    private final Object collectorLock = new Object();
+
+    @GuardedBy("collectorLock")
+    private final BufferedOutputManager<RawUnionValue> collector;
+
+    private final Map<String, TupleTag<?>> outputMap;
+
+    ReceiverFactory(BufferedOutputManager collector, Map<String, TupleTag<?>> 
outputMap) {
+      this.collector = collector;
+      this.outputMap = outputMap;
+    }
+
+    @Override
+    public <OutputT> FnDataReceiver<OutputT> create(String collectionId) {
+      return (receivedElement) -> {
+        synchronized (collectorLock) {
+          collector.output(outputMap.get(collectionId), (WindowedValue) 
receivedElement);
+        }
+      };
+    }
+  }
+
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
new file mode 100644
index 00000000000..95f4f1741ba
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Struct;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.runners.flink.ArtifactSourcePool;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ExecutableStageDoFnOperator}. */
+@RunWith(JUnit4.class)
+public class ExecutableStageDoFnOperatorTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private RuntimeContext runtimeContext;
+  @Mock private DistributedCache distributedCache;
+  @Mock private FlinkExecutableStageContext stageContext;
+  @Mock private StageBundleFactory stageBundleFactory;
+  @Mock private ArtifactSourcePool artifactSourcePool;
+  @Mock private StateRequestHandler stateRequestHandler;
+
+  // NOTE: ExecutableStage.fromPayload expects exactly one input, so we 
provide one here. These unit
+  // tests in general ignore the executable stage itself and mock around it.
+  private final ExecutableStagePayload stagePayload =
+      ExecutableStagePayload.newBuilder()
+          .setInput("input")
+          .setComponents(
+              Components.newBuilder()
+                  .putPcollections("input", PCollection.getDefaultInstance())
+                  .build())
+          .build();
+  private final JobInfo jobInfo = JobInfo.create("job-id", "job-name", 
Struct.getDefaultInstance());
+
+  @Before
+  public void setUpMocks() {
+    MockitoAnnotations.initMocks(this);
+    when(runtimeContext.getDistributedCache()).thenReturn(distributedCache);
+    when(stageContext.getArtifactSourcePool()).thenReturn(artifactSourcePool);
+    when(stageContext.getStateRequestHandler(any(), 
any())).thenReturn(stateRequestHandler);
+    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+  }
+
+  @Test
+  public void sdkErrorsSurfaceOnClose() throws Exception {
+    TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+    DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
+            new DoFnOperator.MultiOutputOutputManagerFactory(
+                    mainOutput, VoidCoder.of());
+    ExecutableStageDoFnOperator<Integer, Integer> operator = 
getOperator(mainOutput,
+            Collections.emptyList(),
+            outputManagerFactory);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<Integer>> testHarness =
+            new OneInputStreamOperatorTestHarness<>(operator);
+
+    testHarness.open();
+
+    @SuppressWarnings("unchecked")
+    RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+    when(stageBundleFactory.getBundle(any(), any())).thenReturn(bundle);
+
+    @SuppressWarnings("unchecked")
+    FnDataReceiver<WindowedValue<Integer>> receiver = 
Mockito.mock(FnDataReceiver.class);
+    when(bundle.getInputReceiver()).thenReturn(receiver);
+
+    Exception expected = new Exception();
+    doThrow(expected).when(bundle).close();
+    thrown.expectCause(is(expected));
+
+    operator.processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(0)));
+  }
+
+  @Test
+  public void expectedInputsAreSent() throws Exception {
+    TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+    DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
+            new DoFnOperator.MultiOutputOutputManagerFactory(
+                    mainOutput, VoidCoder.of());
+    ExecutableStageDoFnOperator<Integer, Integer> operator = 
getOperator(mainOutput,
+            Collections.emptyList(),
+            outputManagerFactory
+    );
+
+    @SuppressWarnings("unchecked")
+    RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+    when(stageBundleFactory.getBundle(any(), any())).thenReturn(bundle);
+
+    @SuppressWarnings("unchecked")
+    FnDataReceiver<WindowedValue<Integer>> receiver = 
Mockito.mock(FnDataReceiver.class);
+    when(bundle.getInputReceiver()).thenReturn(receiver);
+
+    WindowedValue<Integer> one = WindowedValue.valueInGlobalWindow(1);
+    WindowedValue<Integer> two = WindowedValue.valueInGlobalWindow(2);
+    WindowedValue<Integer> three = WindowedValue.valueInGlobalWindow(3);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<Integer>> testHarness =
+            new OneInputStreamOperatorTestHarness<>(operator);
+
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(one));
+    testHarness.processElement(new StreamRecord<>(two));
+    testHarness.processElement(new StreamRecord<>(three));
+
+    verify(receiver).accept(one);
+    verify(receiver).accept(two);
+    verify(receiver).accept(three);
+    verifyNoMoreInteractions(receiver);
+
+    testHarness.close();
+  }
+
+  @Test
+  public void outputsAreTaggedCorrectly() throws Exception {
+
+    WindowedValue.ValueOnlyWindowedValueCoder<Integer> coder =
+            WindowedValue.getValueOnlyCoder(VarIntCoder.of());
+
+    TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+    TupleTag<Integer> additionalOutput1 = new TupleTag<>("output-1");
+    TupleTag<Integer> additionalOutput2 = new TupleTag<>("output-2");
+    ImmutableMap<TupleTag<?>, OutputTag<?>> tagsToOutputTags =
+            ImmutableMap.<TupleTag<?>, OutputTag<?>>builder()
+                    .put(additionalOutput1, new 
OutputTag<String>(additionalOutput1.getId()){})
+                    .put(additionalOutput2, new 
OutputTag<String>(additionalOutput2.getId()){})
+                    .build();
+    ImmutableMap<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders =
+            ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
+                    .put(mainOutput, (Coder) coder)
+                    .put(additionalOutput1, coder)
+                    .put(additionalOutput2, coder)
+                    .build();
+    ImmutableMap<TupleTag<?>, Integer> tagsToIds =
+            ImmutableMap.<TupleTag<?>, Integer>builder()
+                    .put(mainOutput, 0)
+                    .put(additionalOutput1, 1)
+                    .put(additionalOutput2, 2)
+                    .build();
+
+    DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
+            new DoFnOperator.MultiOutputOutputManagerFactory(
+                    mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+
+    WindowedValue<Integer> zero = WindowedValue.valueInGlobalWindow(0);
+    WindowedValue<Integer> three = WindowedValue.valueInGlobalWindow(3);
+    WindowedValue<Integer> four = WindowedValue.valueInGlobalWindow(4);
+    WindowedValue<Integer> five = WindowedValue.valueInGlobalWindow(5);
+
+    // We use a real StageBundleFactory here in order to exercise the output 
receiver factory.
+    StageBundleFactory<Void> stageBundleFactory =
+        new StageBundleFactory<Void>() {
+          @Override
+          public RemoteBundle<Void> getBundle(
+              OutputReceiverFactory receiverFactory, StateRequestHandler 
stateRequestHandler) {
+            return new RemoteBundle<Void>() {
+              @Override
+              public String getId() {
+                return "bundle-id";
+              }
+
+              @Override
+              public FnDataReceiver<WindowedValue<Void>> getInputReceiver() {
+                return input -> {/* Ignore input*/};
+              }
+
+              @Override
+              public void close() throws Exception {
+                // Emit all values to the runner when the bundle is closed.
+                receiverFactory.create(mainOutput.getId()).accept(three);
+                receiverFactory.create(additionalOutput1.getId()).accept(four);
+                receiverFactory.create(additionalOutput2.getId()).accept(five);
+              }
+            };
+          }
+
+          @Override
+          public void close() {}
+        };
+    // Wire the stage bundle factory into our context.
+    
when(stageContext.getStageBundleFactory(any())).thenReturn(stageBundleFactory);
+
+    ExecutableStageDoFnOperator<Integer, Integer> operator = 
getOperator(mainOutput,
+            ImmutableList.of(additionalOutput1, additionalOutput2),
+            outputManagerFactory
+    );
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<Integer>> testHarness =
+            new OneInputStreamOperatorTestHarness<>(operator);
+
+    testHarness.open();
+
+    testHarness.processElement(new StreamRecord<>(zero));
+
+    assertThat(testHarness.getOutput(),
+            contains(new StreamRecord<>(three)));
+
+    
assertThat(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),
+            contains(new StreamRecord<>(four)));
+
+    
assertThat(testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput2)),
+            contains(new StreamRecord<>(five)));
+
+    testHarness.close();
+
+  }
+
+  @Test
+  public void testStageBundleClosed() throws Exception {
+    TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+    DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
+            new DoFnOperator.MultiOutputOutputManagerFactory(
+                    mainOutput, VoidCoder.of());
+    ExecutableStageDoFnOperator<Integer, Integer> operator = 
getOperator(mainOutput,
+            Collections.emptyList(),
+            outputManagerFactory
+    );
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<Integer>> testHarness =
+            new OneInputStreamOperatorTestHarness<>(operator);
+    testHarness.open();
+
+    operator.close();
+    verify(stageBundleFactory).close();
+    verifyNoMoreInteractions(stageBundleFactory);
+
+    testHarness.close();
+  }
+
+  @Test
+  public void testSerialization() {
+    WindowedValue.ValueOnlyWindowedValueCoder<Integer> coder =
+            WindowedValue.getValueOnlyCoder(VarIntCoder.of());
+
+    TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+    TupleTag<Integer> additionalOutput = new TupleTag<>("additional-output");
+    ImmutableMap<TupleTag<?>, OutputTag<?>> tagsToOutputTags =
+            ImmutableMap.<TupleTag<?>, OutputTag<?>>builder()
+                    .put(additionalOutput, new 
OutputTag<>(additionalOutput.getId(),
+                            TypeInformation.of(Integer.class)))
+                    .build();
+    ImmutableMap<TupleTag<?>, Coder<WindowedValue<?>>> tagsToCoders =
+            ImmutableMap.<TupleTag<?>, Coder<WindowedValue<?>>>builder()
+                    .put(mainOutput, (Coder) coder)
+                    .put(additionalOutput, coder)
+                    .build();
+    ImmutableMap<TupleTag<?>, Integer> tagsToIds =
+            ImmutableMap.<TupleTag<?>, Integer>builder()
+                    .put(mainOutput, 0)
+                    .put(additionalOutput, 1)
+                    .build();
+
+    DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory 
=
+            new DoFnOperator.MultiOutputOutputManagerFactory(
+                    mainOutput, tagsToOutputTags, tagsToCoders, tagsToIds);
+
+    ExecutableStageDoFnOperator<Integer, Integer> operator =
+            new ExecutableStageDoFnOperator<>(
+                    "transform",
+                    null,
+                    mainOutput,
+                    ImmutableList.of(additionalOutput),
+                    outputManagerFactory,
+                    Collections.emptyMap() /* sideInputTagMapping */,
+                    Collections.emptyList() /* sideInputs */,
+                    PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+                    stagePayload,
+                    jobInfo,
+                    FlinkExecutableStageContext.batchFactory()
+            );
+
+    ExecutableStageDoFnOperator<Integer, Integer> clone = 
SerializationUtils.clone(operator);
+    assertNotNull(clone);
+    assertNotEquals(operator, clone);
+  }
+
+  /**
+   * Creates a {@link ExecutableStageDoFnOperator}. Sets the runtime context 
to {@link
+   * #runtimeContext}. The context factory is mocked to return {@link 
#stageContext} every time. The
+   * behavior of the stage context itself is unchanged.
+   */
+  private ExecutableStageDoFnOperator<Integer, Integer> 
getOperator(TupleTag<Integer> mainOutput,
+                  List<TupleTag<?>> additionalOutputs,
+                  DoFnOperator.MultiOutputOutputManagerFactory<Integer> 
outputManagerFactory) {
+
+    FlinkExecutableStageContext.Factory contextFactory =
+            Mockito.mock(FlinkExecutableStageContext.Factory.class);
+    when(contextFactory.get(any())).thenReturn(stageContext);
+
+    ExecutableStageDoFnOperator<Integer, Integer> operator =
+            new ExecutableStageDoFnOperator<>(
+                    "transform",
+                    null,
+                    mainOutput,
+                    additionalOutputs,
+                    outputManagerFactory,
+                    Collections.emptyMap() /* sideInputTagMapping */,
+                    Collections.emptyList() /* sideInputs */,
+                    PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+                    stagePayload,
+                    jobInfo,
+                    contextFactory
+            );
+
+    return operator;
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 106770)
    Time Spent: 4h  (was: 3h 50m)

> Flink portable runner executable stage operator for streaming
> -------------------------------------------------------------
>
>                 Key: BEAM-4297
>                 URL: https://issues.apache.org/jira/browse/BEAM-4297
>             Project: Beam
>          Issue Type: Task
>          Components: runner-flink
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to