[ 
https://issues.apache.org/jira/browse/BEAM-3972?focusedWorklogId=98037&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-98037
 ]

ASF GitHub Bot logged work on BEAM-3972:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/May/18 21:01
            Start Date: 03/May/18 21:01
    Worklog Time Spent: 10m 
      Work Description: jkff closed pull request #5226: [BEAM-3972] Translate 
portable batch pipelines by proto
URL: https://github.com/apache/beam/pull/5226
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java
similarity index 87%
rename from 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java
rename to 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java
index fc2cb3dc562..f7adf6de5e1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SyntheticNodes.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SyntheticComponents.java
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.beam.runners.core.construction.graph;
+package org.apache.beam.runners.core.construction;
 
 import java.util.function.Predicate;
 
 /**
- * A utility class to interact with synthetic {@link PipelineNode Pipeline 
Nodes}.
+ * A utility class to interact with synthetic pipeline components.
  */
-class SyntheticNodes {
-  private SyntheticNodes() {}
+public class SyntheticComponents {
+  private SyntheticComponents() {}
 
   /**
    * Generate an ID which does not collide with any existing ID, as determined 
by the input
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
index ddc03355a90..df3aa5fdfb7 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/FusedPipeline.java
@@ -30,6 +30,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
 /** A {@link Pipeline} which has been separated into collections of executable 
components. */
@@ -106,7 +107,7 @@ static FusedPipeline of(
       Set<String> usedNames =
           Sets.union(topLevelTransforms.keySet(), 
getComponents().getTransformsMap().keySet());
       topLevelTransforms.put(
-          SyntheticNodes.uniqueId(baseName, usedNames::contains), 
stage.toPTransform());
+          SyntheticComponents.uniqueId(baseName, usedNames::contains), 
stage.toPTransform());
     }
     return topLevelTransforms;
   }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
index 4419787ede1..45c4a27cdcf 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/OutputDeduplicator.java
@@ -39,6 +39,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
@@ -131,7 +132,7 @@ static DeduplicationResult ensureSingleProducer(
       PTransform flattenPartialPCollections =
           createFlattenOfPartials(partialFlattenTargets.getKey(), 
partialFlattenTargets.getValue());
       String flattenId =
-          SyntheticNodes.uniqueId("unzipped_flatten", 
unzippedComponents::containsTransforms);
+          SyntheticComponents.uniqueId("unzipped_flatten", 
unzippedComponents::containsTransforms);
       unzippedComponents.putTransforms(flattenId, flattenPartialPCollections);
       introducedFlattens.add(PipelineNode.pTransform(flattenId, 
flattenPartialPCollections));
     }
@@ -257,7 +258,7 @@ public static StageDeduplication of(
             id ->
                 
unzippedOutputs.values().stream().map(PCollectionNode::getId).anyMatch(id::equals));
     for (PCollectionNode duplicateOutput : duplicates) {
-      String id = SyntheticNodes.uniqueId(duplicateOutput.getId(), 
existingOrNewIds);
+      String id = SyntheticComponents.uniqueId(duplicateOutput.getId(), 
existingOrNewIds);
       PCollection partial = 
duplicateOutput.getPCollection().toBuilder().setUniqueName(id).build();
       // Check to make sure there is only one duplicated output with the same 
id - which ensures we
       // only introduce one 'partial output' per producer of that output.
diff --git a/runners/flink/build.gradle b/runners/flink/build.gradle
index 41feccad7c6..17d55e8856e 100644
--- a/runners/flink/build.gradle
+++ b/runners/flink/build.gradle
@@ -51,6 +51,7 @@ dependencies {
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow project(path: ":beam-runners-core-java", configuration: "shadow")
   shadow project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  shadow project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   shadow library.java.jackson_annotations
   shadow library.java.findbugs_jsr305
   shadow library.java.slf4j_api
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index ed583df8763..a0ab1f4c336 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -253,6 +253,17 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-java-fn-execution</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-model-pipeline</artifactId>
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
new file mode 100644
index 00000000000..d17e316f0ea
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction;
+import 
org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
+import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.KvKeySelector;
+import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.operators.GroupCombineOperator;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
+import org.apache.flink.api.java.operators.Grouping;
+import org.apache.flink.api.java.operators.MapPartitionOperator;
+
+/**
+ * A translator that translates bounded portable pipelines into executable 
Flink pipelines.
+ *
+ * <p>Example usage:
+ *
+ * <pre>
+ *   FlinkBatchPortablePipelineTranslator translator =
+ *       FlinkBatchPortablePipelineTranslator.createTranslator();
+ *   BatchTranslationContext context =
+ *       
FlinkBatchPortablePipelineTranslator.createTranslationContext(options);
+ *   translator.translate(context, pipeline);
+ *   ExecutionEnvironment executionEnvironment = 
context.getExecutionEnvironment();
+ *   // Do something with executionEnvironment...
+ * </pre>
+ *
+ * <p>After translation the {@link ExecutionEnvironment} in the translation 
context will contain the
+ * full not-yet-executed pipeline DAG corresponding to the input pipeline.
+ */
+public class FlinkBatchPortablePipelineTranslator
+    implements FlinkPortablePipelineTranslator<
+        FlinkBatchPortablePipelineTranslator.BatchTranslationContext> {
+
+  /**
+   * Creates a batch translation context. The resulting Flink execution dag 
will live in a new
+   * {@link ExecutionEnvironment}.
+   */
+  public static BatchTranslationContext 
createTranslationContext(FlinkPipelineOptions options) {
+    ExecutionEnvironment executionEnvironment =
+        FlinkExecutionEnvironments.createBatchExecutionEnvironment(options);
+    return new BatchTranslationContext(options, executionEnvironment);
+  }
+
+  /** Creates a batch translator. */
+  public static FlinkBatchPortablePipelineTranslator createTranslator() {
+    ImmutableMap.Builder<String, PTransformTranslator> translatorMap =
+        ImmutableMap.builder();
+    translatorMap.put(
+        PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        FlinkBatchPortablePipelineTranslator::translateFlatten);
+    translatorMap.put(
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        FlinkBatchPortablePipelineTranslator::translateGroupByKey);
+    translatorMap.put(
+        PTransformTranslation.IMPULSE_TRANSFORM_URN,
+        FlinkBatchPortablePipelineTranslator::translateImpulse);
+    translatorMap.put(
+        PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
+        FlinkBatchPortablePipelineTranslator::translateAssignWindows);
+    translatorMap.put(
+        ExecutableStage.URN, 
FlinkBatchPortablePipelineTranslator::translateExecutableStage);
+    translatorMap.put(
+        PTransformTranslation.RESHUFFLE_URN,
+        FlinkBatchPortablePipelineTranslator::translateReshuffle);
+    translatorMap.put(
+        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
+        FlinkBatchPortablePipelineTranslator::translateView);
+    return new FlinkBatchPortablePipelineTranslator(translatorMap.build());
+  }
+
+  /**
+   * Batch translation context. Stores metadata about known 
PCollections/DataSets and holds the
+   * flink {@link ExecutionEnvironment} that the execution plan will be 
applied to.
+   */
+  public static class BatchTranslationContext
+      implements FlinkPortablePipelineTranslator.TranslationContext {
+
+    private final FlinkPipelineOptions options;
+    private final ExecutionEnvironment executionEnvironment;
+    private final Map<String, DataSet<?>> dataSets;
+    private final Set<String> danglingDataSets;
+
+    private BatchTranslationContext(
+        FlinkPipelineOptions options, ExecutionEnvironment 
executionEnvironment) {
+      this.options = options;
+      this.executionEnvironment = executionEnvironment;
+      dataSets = new HashMap<>();
+      danglingDataSets = new HashSet<>();
+    }
+
+    @Override
+    public FlinkPipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    public ExecutionEnvironment getExecutionEnvironment() {
+      return executionEnvironment;
+    }
+
+    public <T> void addDataSet(String pCollectionId, DataSet<T> dataSet) {
+      checkArgument(!dataSets.containsKey(pCollectionId));
+      dataSets.put(pCollectionId, dataSet);
+      danglingDataSets.add(pCollectionId);
+    }
+
+    public <T> DataSet<T> getDataSetOrThrow(String pCollectionId) {
+      DataSet<T> dataSet = (DataSet<T>) dataSets.get(pCollectionId);
+      if (dataSet == null) {
+        throw new IllegalArgumentException(
+            String.format("Unknown dataset for id %s.", pCollectionId));
+      }
+
+      // Assume that the DataSet is consumed if requested. We use this as a 
proxy for consumption
+      // because Flink does not expose its internal execution plan.
+      danglingDataSets.remove(pCollectionId);
+      return dataSet;
+    }
+
+    public Collection<DataSet<?>> getDanglingDataSets() {
+      return danglingDataSets.stream().map(id -> 
dataSets.get(id)).collect(Collectors.toList());
+    }
+  }
+
+  /** Transform translation interface. */
+  @FunctionalInterface
+  private interface PTransformTranslator {
+    /** Translate a PTransform into the given translation context. */
+    void translate(
+        PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context);
+  }
+
+  private final Map<String, PTransformTranslator> urnToTransformTranslator;
+
+  private FlinkBatchPortablePipelineTranslator(
+      Map<String, PTransformTranslator> urnToTransformTranslator) {
+    this.urnToTransformTranslator = urnToTransformTranslator;
+  }
+
+  @Override
+  public void translate(BatchTranslationContext context, RunnerApi.Pipeline 
pipeline) {
+    // Use a QueryablePipeline to traverse transforms topologically.
+    QueryablePipeline p =
+        QueryablePipeline.forTransforms(
+            pipeline.getRootTransformIdsList(), pipeline.getComponents());
+    for (PipelineNode.PTransformNode transform : 
p.getTopologicallyOrderedTransforms()) {
+      urnToTransformTranslator
+          .getOrDefault(
+              transform.getTransform().getSpec().getUrn(),
+              FlinkBatchPortablePipelineTranslator::urnNotFound)
+          .translate(transform, pipeline, context);
+    }
+
+    // Ensure that side effects are performed for unconsumed DataSets.
+    for (DataSet<?> dataSet : context.getDanglingDataSets()) {
+      dataSet.output(new DiscardingOutputFormat<>());
+    }
+  }
+
+  private static <InputT> void translateView(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+
+    DataSet<WindowedValue<InputT>> inputDataSet =
+        context.getDataSetOrThrow(
+            
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+
+    context.addDataSet(
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), 
inputDataSet);
+  }
+
+  private static <K, V> void translateReshuffle(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet =
+        context.getDataSetOrThrow(
+            
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+    context.addDataSet(
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()),
+        inputDataSet.rebalance());
+  }
+
+  private static <T> void translateAssignWindows(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    RunnerApi.Components components = pipeline.getComponents();
+    RunnerApi.WindowIntoPayload payload;
+    try {
+      payload =
+          
RunnerApi.WindowIntoPayload.parseFrom(transform.getTransform().getSpec().getPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(e);
+    }
+    WindowFn<T, ? extends BoundedWindow> windowFn =
+        (WindowFn<T, ? extends BoundedWindow>)
+            
WindowingStrategyTranslation.windowFnFromProto(payload.getWindowFn());
+
+    String inputCollectionId =
+        
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
+    String outputCollectionId =
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values());
+    Coder<WindowedValue<T>> outputCoder =
+        instantiateRunnerWireCoder(outputCollectionId, components);
+    TypeInformation<WindowedValue<T>> resultTypeInfo = new 
CoderTypeInformation<>(outputCoder);
+
+    DataSet<WindowedValue<T>> inputDataSet = 
context.getDataSetOrThrow(inputCollectionId);
+
+    FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+        new FlinkAssignWindows<>(windowFn);
+
+    DataSet<WindowedValue<T>> resultDataSet =
+        inputDataSet
+            .flatMap(assignWindowsFunction)
+            .name(transform.getTransform().getUniqueName())
+            .returns(resultTypeInfo);
+
+    context.addDataSet(outputCollectionId, resultDataSet);
+  }
+
+  private static <InputT> void translateExecutableStage(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    // TODO: Fail on stateful DoFns for now.
+    // TODO: Support stateful DoFns by inserting group-by-keys where necessary.
+    // TODO: Fail on splittable DoFns.
+    // TODO: Special-case single outputs to avoid multiplexing PCollections.
+
+    RunnerApi.Components components = pipeline.getComponents();
+    Map<String, String> outputs = transform.getTransform().getOutputsMap();
+    // Mapping from PCollection id to coder tag id.
+    BiMap<String, Integer> outputMap = createOutputMap(outputs.values());
+    // Collect all output Coders and create a UnionCoder for our tagged 
outputs.
+    List<Coder<?>> unionCoders = Lists.newArrayList();
+    // Enforce tuple tag sorting by union tag index.
+    Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap();
+    for (String collectionId : new TreeMap<>(outputMap.inverse()).values()) {
+      Coder<WindowedValue<?>> windowCoder =
+          (Coder) instantiateRunnerWireCoder(collectionId, components);
+      outputCoders.put(collectionId, windowCoder);
+      unionCoders.add(windowCoder);
+    }
+    UnionCoder unionCoder = UnionCoder.of(unionCoders);
+    TypeInformation<RawUnionValue> typeInformation = new 
CoderTypeInformation<>(unionCoder);
+
+    RunnerApi.ExecutableStagePayload stagePayload;
+    try {
+      stagePayload =
+          RunnerApi.ExecutableStagePayload.parseFrom(
+              transform.getTransform().getSpec().getPayload());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    FlinkExecutableStageFunction<InputT> function =
+        new FlinkExecutableStageFunction<>(
+            stagePayload,
+            PipelineOptionsTranslation.toProto(context.getPipelineOptions()),
+            outputMap);
+
+    DataSet<WindowedValue<InputT>> inputDataSet =
+        context.getDataSetOrThrow(stagePayload.getInput());
+
+    MapPartitionOperator<WindowedValue<InputT>, RawUnionValue> taggedDataset =
+        new MapPartitionOperator<>(
+            inputDataSet, typeInformation, function, 
transform.getTransform().getUniqueName());
+
+    for (SideInputId sideInputId : stagePayload.getSideInputsList()) {
+      String collectionId =
+          components
+              .getTransformsOrThrow(sideInputId.getTransformId())
+              .getInputsOrThrow(sideInputId.getLocalName());
+      // Register under the global PCollection name. Only 
ExecutableStageFunction needs to know the
+      // mapping from local name to global name and how to translate the 
broadcast data to a state
+      // API view.
+      taggedDataset.withBroadcastSet(context.getDataSetOrThrow(collectionId), 
collectionId);
+    }
+
+    for (String collectionId : outputs.values()) {
+      pruneOutput(
+          taggedDataset,
+          context,
+          outputMap.get(collectionId),
+          (Coder) outputCoders.get(collectionId),
+          transform.getTransform().getUniqueName(),
+          collectionId);
+    }
+    if (outputs.isEmpty()) {
+      // NOTE: After pipeline translation, we traverse the set of unconsumed 
PCollections and add a
+      // no-op sink to each to make sure they are materialized by Flink. 
However, some SDK-executed
+      // stages have no runner-visible output after fusion. We handle this 
case by adding a sink
+      // here.
+      taggedDataset.output(new DiscardingOutputFormat());
+    }
+  }
+
+  private static <T> void translateFlatten(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    Map<String, String> allInputs = transform.getTransform().getInputsMap();
+    DataSet<WindowedValue<T>> result = null;
+
+    if (allInputs.isEmpty()) {
+
+      // Create an empty dummy source to satisfy downstream operations. We 
cannot create an empty
+      // source in Flink, so we send the DataSet to a flatMap that never 
forwards its element.
+      DataSource<String> dummySource = 
context.getExecutionEnvironment().fromElements("dummy");
+      result =
+          dummySource
+              .<WindowedValue<T>>flatMap(
+                  (s, collector) -> {
+                    // never return anything
+                  })
+              .returns(
+                  new CoderTypeInformation<>(
+                      WindowedValue.getFullCoder(
+                          (Coder<T>) VoidCoder.of(), 
GlobalWindow.Coder.INSTANCE)));
+    } else {
+      for (String pCollectionId : allInputs.values()) {
+        DataSet<WindowedValue<T>> current = 
context.getDataSetOrThrow(pCollectionId);
+        if (result == null) {
+          result = current;
+        } else {
+          result = result.union(current);
+        }
+      }
+    }
+
+    // Insert a dummy filter. Flink produces duplicate elements after the 
union in some cases if we
+    // don't do so.
+    result = result.filter(tWindowedValue -> true).name("UnionFixFilter");
+    context.addDataSet(
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), 
result);
+  }
+
+  private static <K, V> void translateGroupByKey(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    String inputPCollectionId =
+        
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet = 
context.getDataSetOrThrow(inputPCollectionId);
+    RunnerApi.WindowingStrategy windowingStrategyProto =
+        pipeline
+            .getComponents()
+            .getWindowingStrategiesOrThrow(
+                pipeline
+                    .getComponents()
+                    .getPcollectionsOrThrow(inputPCollectionId)
+                    .getWindowingStrategyId());
+
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(pipeline.getComponents());
+
+    WindowingStrategy<Object, BoundedWindow> windowingStrategy;
+    try {
+      windowingStrategy =
+          (WindowingStrategy<Object, BoundedWindow>)
+              WindowingStrategyTranslation.fromProto(windowingStrategyProto, 
rehydratedComponents);
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalStateException(
+          String.format(
+              "Unable to hydrate GroupByKey windowing strategy %s.", 
windowingStrategyProto),
+          e);
+    }
+
+    WindowedValueCoder<KV<K, V>> inputCoder =
+        instantiateRunnerWireCoder(inputPCollectionId, 
pipeline.getComponents());
+
+    KvCoder<K, V> inputElementCoder = (KvCoder<K, V>) 
inputCoder.getValueCoder();
+
+    Concatenate<V> combineFn = new Concatenate<>();
+    Coder<List<V>> accumulatorCoder =
+        combineFn.getAccumulatorCoder(
+            CoderRegistry.createDefault(), inputElementCoder.getValueCoder());
+
+    Coder<WindowedValue<KV<K, List<V>>>> outputCoder =
+        WindowedValue.getFullCoder(
+            KvCoder.of(inputElementCoder.getKeyCoder(), accumulatorCoder),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    TypeInformation<WindowedValue<KV<K, List<V>>>> partialReduceTypeInfo =
+        new CoderTypeInformation<>(outputCoder);
+
+    Grouping<WindowedValue<KV<K, V>>> inputGrouping =
+        inputDataSet.groupBy(new 
KvKeySelector<>(inputElementCoder.getKeyCoder()));
+
+    FlinkPartialReduceFunction<K, V, List<V>, ?> partialReduceFunction =
+        new FlinkPartialReduceFunction<>(
+            combineFn, windowingStrategy, Collections.emptyMap(), 
context.getPipelineOptions());
+
+    FlinkReduceFunction<K, List<V>, List<V>, ?> reduceFunction =
+        new FlinkReduceFunction<>(
+            combineFn, windowingStrategy, Collections.emptyMap(), 
context.getPipelineOptions());
+
+    // Partially GroupReduce the values into the intermediate format AccumT 
(combine)
+    GroupCombineOperator<WindowedValue<KV<K, V>>, WindowedValue<KV<K, 
List<V>>>> groupCombine =
+        new GroupCombineOperator<>(
+            inputGrouping,
+            partialReduceTypeInfo,
+            partialReduceFunction,
+            "GroupCombine: " + transform.getTransform().getUniqueName());
+
+    Grouping<WindowedValue<KV<K, List<V>>>> intermediateGrouping =
+        groupCombine.groupBy(new 
KvKeySelector<>(inputElementCoder.getKeyCoder()));
+
+    // Fully reduce the values and create output format VO
+    GroupReduceOperator<WindowedValue<KV<K, List<V>>>, WindowedValue<KV<K, 
List<V>>>>
+        outputDataSet =
+            new GroupReduceOperator<>(
+                intermediateGrouping,
+                partialReduceTypeInfo,
+                reduceFunction,
+                transform.getTransform().getUniqueName());
+
+    context.addDataSet(
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), 
outputDataSet);
+  }
+
+  private static void translateImpulse(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    TypeInformation<WindowedValue<byte[]>> typeInformation =
+        new CoderTypeInformation<>(
+            WindowedValue.getFullCoder(ByteArrayCoder.of(), 
GlobalWindow.Coder.INSTANCE));
+    DataSource<WindowedValue<byte[]>> dataSource =
+        new DataSource<>(
+            context.getExecutionEnvironment(),
+            new ImpulseInputFormat(),
+            typeInformation,
+            transform.getTransform().getUniqueName());
+
+    context.addDataSet(
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), 
dataSource);
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
+   *
+   * <p>For internal use to translate {@link GroupByKey}. For a large {@link 
PCollection} this is
+   * expected to crash!
+   *
+   * <p>This is copied from the dataflow runner code.
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, 
List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  private static void urnNotFound(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    throw new IllegalArgumentException(
+        String.format(
+            "Unknown type of URN %s for PTransform with id %s.",
+            transform.getTransform().getSpec().getUrn(), transform.getId()));
+  }
+
+  private static void pruneOutput(
+      DataSet<RawUnionValue> taggedDataset,
+      BatchTranslationContext context,
+      int unionTag,
+      Coder<WindowedValue<Object>> outputCoder,
+      String transformName,
+      String collectionId) {
+    TypeInformation<WindowedValue<Object>> outputType = new 
CoderTypeInformation<>(outputCoder);
+    FlinkExecutableStagePruningFunction<Object> pruningFunction =
+        new FlinkExecutableStagePruningFunction<>(unionTag);
+    FlatMapOperator<RawUnionValue, WindowedValue<Object>> pruningOperator =
+        new FlatMapOperator<>(
+            taggedDataset,
+            outputType,
+            pruningFunction,
+            String.format("%s/out.%d", transformName, unionTag));
+    context.addDataSet(collectionId, pruningOperator);
+  }
+
+  /**  Creates a mapping from PCollection id to output tag integer. */
+  private static BiMap<String, Integer> createOutputMap(Iterable<String> 
localOutputs) {
+    ImmutableBiMap.Builder<String, Integer> builder = ImmutableBiMap.builder();
+    int outputIndex = 0;
+    for (String tag : localOutputs) {
+      builder.put(tag, outputIndex);
+      outputIndex++;
+    }
+    return builder.build();
+  }
+
+  /** Instantiates a Java coder for windowed values of the given PCollection 
id. */
+  private static <T> WindowedValueCoder<T> instantiateRunnerWireCoder(
+      String collectionId, RunnerApi.Components components) {
+    RunnerApi.PCollection collection = 
components.getPcollectionsOrThrow(collectionId);
+    PCollectionNode collectionNode = PipelineNode.pCollection(collectionId, 
collection);
+
+    // Instantiate the wire coder by length-prefixing unknown coders.
+    RunnerApi.MessageWithComponents protoCoder =
+        WireCoders.createRunnerWireCoder(collectionNode, components, 
components::containsCoders);
+    Coder<?> javaCoder;
+    try {
+      javaCoder =
+          CoderTranslation.fromProto(
+              protoCoder.getCoder(),
+              RehydratedComponents.forComponents(protoCoder.getComponents()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    checkArgument(javaCoder instanceof WindowedValueCoder);
+    return (WindowedValueCoder<T>) javaCoder;
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
new file mode 100644
index 00000000000..a06cea52909
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import java.util.List;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Utilities for Flink execution environments. */
+public class FlinkExecutionEnvironments {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.api.java.ExecutionEnvironment} depending on the 
user-specified options.
+   */
+  public static ExecutionEnvironment 
createBatchExecutionEnvironment(FlinkPipelineOptions options) {
+
+    LOG.info("Creating a Batch Execution Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    ExecutionEnvironment flinkBatchEnv;
+
+    // depending on the master, create the right environment.
+    if ("[local]".equals(masterUrl)) {
+      flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+    } else if ("[collection]".equals(masterUrl)) {
+      flinkBatchEnv = new CollectionEnvironment();
+    } else if ("[auto]".equals(masterUrl)) {
+      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      flinkBatchEnv =
+          ExecutionEnvironment.createRemoteEnvironment(
+              parts[0],
+              Integer.parseInt(parts[1]),
+              stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
+      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof 
CollectionEnvironment)) {
+      flinkBatchEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkBatchEnv.getParallelism());
+
+    if (options.getObjectReuse()) {
+      flinkBatchEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkBatchEnv.getConfig().disableObjectReuse();
+    }
+
+    return flinkBatchEnv;
+  }
+
+  /**
+   * If the submitted job is a stream processing job, this method creates the 
adequate Flink {@link
+   * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} 
depending on the
+   * user-specified options.
+   */
+  public static StreamExecutionEnvironment createStreamExecutionEnvironment(
+      FlinkPipelineOptions options) {
+
+    LOG.info("Creating a Streaming Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    StreamExecutionEnvironment flinkStreamEnv = null;
+
+    // depending on the master, create the right environment.
+    if ("[local]".equals(masterUrl)) {
+      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+    } else if ("[auto]".equals(masterUrl)) {
+      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      flinkStreamEnv =
+          StreamExecutionEnvironment.createRemoteEnvironment(
+              parts[0],
+              Integer.parseInt(parts[1]),
+              stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
+      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      flinkStreamEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkStreamEnv.getParallelism());
+
+    if (options.getObjectReuse()) {
+      flinkStreamEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkStreamEnv.getConfig().disableObjectReuse();
+    }
+
+    // default to event time
+    flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // for the following 2 parameters, a value of -1 means that Flink will use
+    // the default values as specified in the configuration.
+    int numRetries = options.getNumberOfExecutionRetries();
+    if (numRetries != -1) {
+      flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+    }
+    long retryDelay = options.getExecutionRetryDelay();
+    if (retryDelay != -1) {
+      flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+    }
+
+    // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
+    // If the value is not -1, then the validity checks are applied.
+    // By default, checkpointing is disabled.
+    long checkpointInterval = options.getCheckpointingInterval();
+    if (checkpointInterval != -1) {
+      if (checkpointInterval < 1) {
+        throw new IllegalArgumentException("The checkpoint interval must be 
positive");
+      }
+      flinkStreamEnv.enableCheckpointing(checkpointInterval, 
options.getCheckpointingMode());
+      flinkStreamEnv
+          .getCheckpointConfig()
+          .setCheckpointTimeout(options.getCheckpointTimeoutMillis());
+      boolean externalizedCheckpoint = 
options.isExternalizedCheckpointsEnabled();
+      boolean retainOnCancellation = 
options.getRetainExternalizedCheckpointsOnCancellation();
+      if (externalizedCheckpoint) {
+        flinkStreamEnv
+            .getCheckpointConfig()
+            .enableExternalizedCheckpoints(
+                retainOnCancellation
+                    ? ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
+                    : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+      }
+    }
+
+    // State backend
+    final AbstractStateBackend stateBackend = options.getStateBackend();
+    if (stateBackend != null) {
+      flinkStreamEnv.setStateBackend(stateBackend);
+    }
+
+    return flinkStreamEnv;
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index ac956d5c13d..a178acf45b2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -20,18 +20,11 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.IOException;
-import java.util.List;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The class that instantiates and manages the execution of a given job.
@@ -43,9 +36,6 @@
  */
 class FlinkPipelineExecutionEnvironment {
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
-
   private final FlinkPipelineOptions options;
 
   /**
@@ -104,10 +94,10 @@ public void translate(FlinkRunner flinkRunner, Pipeline 
pipeline) {
 
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
-      this.flinkStreamEnv = createStreamExecutionEnvironment();
+      this.flinkStreamEnv = 
FlinkExecutionEnvironments.createStreamExecutionEnvironment(options);
       translator = new FlinkStreamingPipelineTranslator(flinkRunner, 
flinkStreamEnv, options);
     } else {
-      this.flinkBatchEnv = createBatchExecutionEnvironment();
+      this.flinkBatchEnv = 
FlinkExecutionEnvironments.createBatchExecutionEnvironment(options);
       translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
     }
 
@@ -129,135 +119,4 @@ public JobExecutionResult executePipeline() throws 
Exception {
     }
   }
 
-  /**
-   * If the submitted job is a batch processing job, this method creates the 
adequate
-   * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
-   * on the user-specified options.
-   */
-  private ExecutionEnvironment createBatchExecutionEnvironment() {
-
-    LOG.info("Creating the required Batch Execution Environment.");
-
-    String masterUrl = options.getFlinkMaster();
-    ExecutionEnvironment flinkBatchEnv;
-
-    // depending on the master, create the right environment.
-    if ("[local]".equals(masterUrl)) {
-      flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
-    } else if ("[collection]".equals(masterUrl)) {
-      flinkBatchEnv = new CollectionEnvironment();
-    } else if ("[auto]".equals(masterUrl)) {
-      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
-    } else if (masterUrl.matches(".*:\\d*")) {
-      String[] parts = masterUrl.split(":");
-      List<String> stagingFiles = options.getFilesToStage();
-      flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
-          Integer.parseInt(parts[1]),
-          stagingFiles.toArray(new String[stagingFiles.size()]));
-    } else {
-      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
-      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
-    }
-
-    // set the correct parallelism.
-    if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof 
CollectionEnvironment)) {
-      flinkBatchEnv.setParallelism(options.getParallelism());
-    }
-
-    // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkBatchEnv.getParallelism());
-
-    if (options.getObjectReuse()) {
-      flinkBatchEnv.getConfig().enableObjectReuse();
-    } else {
-      flinkBatchEnv.getConfig().disableObjectReuse();
-    }
-
-    return flinkBatchEnv;
-  }
-
-  /**
-   * If the submitted job is a stream processing job, this method creates the 
adequate
-   * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
-   * on the user-specified options.
-   */
-  private StreamExecutionEnvironment createStreamExecutionEnvironment() {
-
-    LOG.info("Creating the required Streaming Environment.");
-
-    String masterUrl = options.getFlinkMaster();
-    StreamExecutionEnvironment flinkStreamEnv = null;
-
-    // depending on the master, create the right environment.
-    if ("[local]".equals(masterUrl)) {
-      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
-    } else if ("[auto]".equals(masterUrl)) {
-      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-    } else if (masterUrl.matches(".*:\\d*")) {
-      String[] parts = masterUrl.split(":");
-      List<String> stagingFiles = options.getFilesToStage();
-      flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
-          Integer.parseInt(parts[1]), stagingFiles.toArray(new 
String[stagingFiles.size()]));
-    } else {
-      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
-      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
-    }
-
-    // set the correct parallelism.
-    if (options.getParallelism() != -1) {
-      flinkStreamEnv.setParallelism(options.getParallelism());
-    }
-
-    // set parallelism in the options (required by some execution code)
-    options.setParallelism(flinkStreamEnv.getParallelism());
-
-    if (options.getObjectReuse()) {
-      flinkStreamEnv.getConfig().enableObjectReuse();
-    } else {
-      flinkStreamEnv.getConfig().disableObjectReuse();
-    }
-
-    // default to event time
-    flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-    // for the following 2 parameters, a value of -1 means that Flink will use
-    // the default values as specified in the configuration.
-    int numRetries = options.getNumberOfExecutionRetries();
-    if (numRetries != -1) {
-      flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
-    }
-    long retryDelay = options.getExecutionRetryDelay();
-    if (retryDelay != -1) {
-      flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
-    }
-
-    // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
-    // If the value is not -1, then the validity checks are applied.
-    // By default, checkpointing is disabled.
-    long checkpointInterval = options.getCheckpointingInterval();
-    if (checkpointInterval != -1) {
-      if (checkpointInterval < 1) {
-        throw new IllegalArgumentException("The checkpoint interval must be 
positive");
-      }
-      flinkStreamEnv.enableCheckpointing(checkpointInterval, 
options.getCheckpointingMode());
-      flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout(
-          options.getCheckpointTimeoutMillis());
-      boolean externalizedCheckpoint = 
options.isExternalizedCheckpointsEnabled();
-      boolean retainOnCancellation = 
options.getRetainExternalizedCheckpointsOnCancellation();
-      if (externalizedCheckpoint) {
-        flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
-            retainOnCancellation ? 
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
-                : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
-      }
-    }
-
-    // State backend
-    final AbstractStateBackend stateBackend = options.getStateBackend();
-    if (stateBackend != null) {
-      flinkStreamEnv.setStateBackend(stateBackend);
-    }
-
-    return flinkStreamEnv;
-  }
-
 }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
new file mode 100644
index 00000000000..d8a56c9badc
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Interface for portable Flink translators. This allows for a uniform 
invocation pattern for
+ * pipeline translation between streaming and portable runners.
+ *
+ * <p>Pipeline translators will generally provide a mechanism to produce the 
translation contexts
+ * that they use for pipeline translation. Post translation, the translation 
context should contain
+ * a pipeline plan that has not yet been executed.
+ */
+public interface FlinkPortablePipelineTranslator<
+    T extends FlinkPortablePipelineTranslator.TranslationContext> {
+
+  /** The context used for pipeline translation. */
+  interface TranslationContext {
+    FlinkPipelineOptions getPipelineOptions();
+  }
+
+  /** Translates the given pipeline. */
+  void translate(T context, RunnerApi.Pipeline pipeline);
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
new file mode 100644
index 00000000000..ec29c2606f1
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import com.google.protobuf.Struct;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+// TODO: https://issues.apache.org/jira/browse/BEAM-2597 Implement this 
executable stage operator.
+/**
+ * Flink operator that passes its input DataSet through an SDK-executed {@link
+ * org.apache.beam.runners.core.construction.graph.ExecutableStage}.
+ *
+ * <p>The output of this operation is a multiplexed DataSet whose elements are 
tagged with a union
+ * coder. The coder's tags are determined by the output coder map. The 
resulting data set should be
+ * further processed by a {@link FlinkExecutableStagePruningFunction}.
+ */
+public class FlinkExecutableStageFunction<InputT>
+    extends RichMapPartitionFunction<WindowedValue<InputT>, RawUnionValue> {
+
+  // The executable stage this function will run.
+  private final RunnerApi.ExecutableStagePayload stagePayload;
+  // Pipeline options. Used for provisioning api.
+  private final Struct pipelineOptions;
+  // Map from PCollection id to the union tag used to represent this 
PCollection in the output.
+  private final Map<String, Integer> outputMap;
+
+  public FlinkExecutableStageFunction(
+      RunnerApi.ExecutableStagePayload stagePayload,
+      Struct pipelineOptions,
+      Map<String, Integer> outputMap) {
+    this.stagePayload = stagePayload;
+    this.pipelineOptions = pipelineOptions;
+    this.outputMap = outputMap;
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void mapPartition(
+      Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> 
collector)
+      throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws Exception {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStagePruningFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStagePruningFunction.java
new file mode 100644
index 00000000000..1680495ea64
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStagePruningFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+/** A Flink function that demultiplexes output from a {@link 
FlinkExecutableStageFunction}. */
+public class FlinkExecutableStagePruningFunction<T>
+    implements FlatMapFunction<RawUnionValue, WindowedValue<T>> {
+
+  private final int unionTag;
+
+  /**
+   * Creates a {@link FlinkExecutableStagePruningFunction} that extracts 
elements of the given union
+   * tag.
+   */
+  public FlinkExecutableStagePruningFunction(int unionTag) {
+    this.unionTag = unionTag;
+  }
+
+  @Override
+  public void flatMap(RawUnionValue rawUnionValue, Collector<WindowedValue<T>> 
collector) {
+    if (rawUnionValue.getUnionTag() == unionTag) {
+      collector.collect((WindowedValue<T>) rawUnionValue.getValue());
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index 0055273b9b8..2ecad1133f3 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.fnexecution.control;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.runners.core.construction.SyntheticComponents.uniqueId;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Iterables;
@@ -26,7 +27,6 @@
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
@@ -39,13 +39,13 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.CoderTranslation;
-import org.apache.beam.runners.core.construction.ModelCoders;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
-import org.apache.beam.runners.fnexecution.graph.LengthPrefixUnknownCoders;
+import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
@@ -102,7 +102,7 @@ public static ExecutableProcessBundleDescriptor 
fromExecutableStage(
             .setCoderId(inputWireCoderId)
             .build();
     String inputId =
-        uniquifyId(
+        uniqueId(
             String.format("fn/read/%s", inputPCollection.getId()),
             bundleDescriptorBuilder::containsTransforms);
     PTransform inputTransform =
@@ -132,7 +132,7 @@ private static TargetEncoding addStageOutput(
     RemoteGrpcPortWrite outputWrite =
         RemoteGrpcPortWrite.writeToPort(outputPCollection.getId(), outputPort);
     String outputId =
-        uniquifyId(
+        uniqueId(
             String.format("fn/write/%s", outputPCollection.getId()),
             bundleDescriptorBuilder::containsTransforms);
     PTransform outputTransform = outputWrite.toPTransform();
@@ -161,44 +161,17 @@ private static String addWireCoder(
       Components components,
       ProcessBundleDescriptor.Builder bundleDescriptorBuilder) {
     MessageWithComponents wireCoder =
-        getWireCoder(pCollection, components, 
bundleDescriptorBuilder::containsCoders);
+        WireCoders.createSdkWireCoder(
+            pCollection, components, bundleDescriptorBuilder::containsCoders);
     
bundleDescriptorBuilder.putAllCoders(wireCoder.getComponents().getCodersMap());
     String wireCoderId =
-        uniquifyId(
+        uniqueId(
             String.format("fn/wire/%s", pCollection.getId()),
             bundleDescriptorBuilder::containsCoders);
     bundleDescriptorBuilder.putCoders(wireCoderId, wireCoder.getCoder());
     return wireCoderId;
   }
 
-  private static MessageWithComponents getWireCoder(
-      PCollectionNode pCollectionNode, Components components, 
Predicate<String> usedIds) {
-    String elementCoderId = pCollectionNode.getPCollection().getCoderId();
-    String windowingStrategyId = 
pCollectionNode.getPCollection().getWindowingStrategyId();
-    String windowCoderId =
-        
components.getWindowingStrategiesOrThrow(windowingStrategyId).getWindowCoderId();
-    RunnerApi.Coder windowedValueCoder =
-        ModelCoders.windowedValueCoder(elementCoderId, windowCoderId);
-    // Add the original WindowedValue<T, W> coder to the components;
-    String windowedValueId =
-        uniquifyId(String.format("fn/wire/%s", pCollectionNode.getId()), 
usedIds);
-    return LengthPrefixUnknownCoders.forCoder(
-        windowedValueId,
-        components.toBuilder().putCoders(windowedValueId, 
windowedValueCoder).build(),
-        false);
-  }
-
-  private static String uniquifyId(String idBase, Predicate<String> idUsed) {
-    if (!idUsed.test(idBase)) {
-      return idBase;
-    }
-    int i = 0;
-    while (idUsed.test(String.format("%s_%s", idBase, i))) {
-      i++;
-    }
-    return String.format("%s_%s", idBase, i);
-  }
-
   private static Coder<WindowedValue<?>> instantiateWireCoder(
       RemoteGrpcPort port, Map<String, RunnerApi.Coder> components) throws 
IOException {
     MessageWithComponents byteArrayCoder =
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
similarity index 99%
rename from 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java
rename to 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
index afae59c3bba..34a17e41b45 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCoders.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
@@ -14,7 +14,7 @@
  * the License.
  */
 
-package org.apache.beam.runners.fnexecution.graph;
+package org.apache.beam.runners.fnexecution.wire;
 
 import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
 
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java
new file mode 100644
index 00000000000..3ec1fc74acb
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.wire;
+
+import java.util.function.Predicate;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+
+/** Helpers to construct coders for gRPC port reads and writes. */
+public class WireCoders {
+  /** Creates an SDK-side wire coder for a port read/write for the given 
PCollection. */
+  public static RunnerApi.MessageWithComponents createSdkWireCoder(
+      PCollectionNode pCollectionNode, RunnerApi.Components components, 
Predicate<String> idUsed) {
+    return createWireCoder(pCollectionNode, components, idUsed, false);
+  }
+
+  /**
+   * Creates a runner-side wire coder for a port read/write for the given 
PCollection. Returns a
+   * windowed value coder. The element coder itself
+   */
+  public static RunnerApi.MessageWithComponents createRunnerWireCoder(
+      PCollectionNode pCollectionNode, RunnerApi.Components components, 
Predicate<String> idUsed) {
+    return createWireCoder(pCollectionNode, components, idUsed, true);
+  }
+
+  private static RunnerApi.MessageWithComponents createWireCoder(
+      PCollectionNode pCollectionNode,
+      RunnerApi.Components components,
+      Predicate<String> idUsed,
+      boolean useByteArrayCoder) {
+    String elementCoderId = pCollectionNode.getPCollection().getCoderId();
+    String windowingStrategyId = 
pCollectionNode.getPCollection().getWindowingStrategyId();
+    String windowCoderId =
+        
components.getWindowingStrategiesOrThrow(windowingStrategyId).getWindowCoderId();
+    RunnerApi.Coder windowedValueCoder =
+        ModelCoders.windowedValueCoder(elementCoderId, windowCoderId);
+    // Add the original WindowedValue<T, W> coder to the components;
+    String windowedValueId =
+        SyntheticComponents.uniqueId(String.format("fn/wire/%s", 
pCollectionNode.getId()), idUsed);
+    return LengthPrefixUnknownCoders.forCoder(
+        windowedValueId,
+        components.toBuilder().putCoders(windowedValueId, 
windowedValueCoder).build(),
+        useByteArrayCoder);
+  }
+
+  // Not instantiable.
+  private WireCoders() {}
+}
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/package-info.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/package-info.java
new file mode 100644
index 00000000000..4da125270cc
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Wire coders for communications between runner and SDK harness. */
+package org.apache.beam.runners.fnexecution.wire;
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCodersTest.java
similarity index 99%
rename from 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java
rename to 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCodersTest.java
index 4cfa5c0c545..e18151d293c 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/graph/LengthPrefixUnknownCodersTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCodersTest.java
@@ -13,7 +13,7 @@
  * License for the specific language governing permissions and limitations 
under
  * the License.
  */
-package org.apache.beam.runners.fnexecution.graph;
+package org.apache.beam.runners.fnexecution.wire;
 
 import static org.junit.Assert.assertEquals;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 98037)
    Time Spent: 8h 40m  (was: 8.5h)

> Flink runner translates batch pipelines directly by proto
> ---------------------------------------------------------
>
>                 Key: BEAM-3972
>                 URL: https://issues.apache.org/jira/browse/BEAM-3972
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Major
>          Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> The non-portable runner uses reydrated pipelines which lack necessary 
> information. The portable Flink runner needs to translate pipelines directly 
> by proto in order to wire components into individual executable stages 
> correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to