[ 
https://issues.apache.org/jira/browse/BEAM-4073?focusedWorklogId=102281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102281
 ]

ASF GitHub Bot logged work on BEAM-4073:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/May/18 20:51
            Start Date: 15/May/18 20:51
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5271: [BEAM-4073] Migrate 
DirectRunner Evaluators to use Portable Graph Components
URL: https://github.com/apache/beam/pull/5271
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
index 86031aa0356..3544d66d850 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
@@ -40,19 +40,16 @@ private ModelCoders() {}
 
   public static final String ITERABLE_CODER_URN = 
getUrn(StandardCoders.Enum.ITERABLE);
   public static final String KV_CODER_URN = getUrn(StandardCoders.Enum.KV);
-  public static final String LENGTH_PREFIX_CODER_URN =
-      getUrn(StandardCoders.Enum.LENGTH_PREFIX);
+  public static final String LENGTH_PREFIX_CODER_URN = 
getUrn(StandardCoders.Enum.LENGTH_PREFIX);
 
-  public static final String GLOBAL_WINDOW_CODER_URN =
-      getUrn(StandardCoders.Enum.GLOBAL_WINDOW);
+  public static final String GLOBAL_WINDOW_CODER_URN = 
getUrn(StandardCoders.Enum.GLOBAL_WINDOW);
   // This isn't strictly required once there's a way to represent an 'unknown 
window' (i.e. the
   // custom window encoding + the maximum timestamp of the window, this can be 
used for interval
   // windows.
   public static final String INTERVAL_WINDOW_CODER_URN =
       getUrn(StandardCoders.Enum.INTERVAL_WINDOW);
 
-  public static final String WINDOWED_VALUE_CODER_URN =
-      getUrn(StandardCoders.Enum.WINDOWED_VALUE);
+  public static final String WINDOWED_VALUE_CODER_URN = 
getUrn(StandardCoders.Enum.WINDOWED_VALUE);
 
   private static final Set<String> MODEL_CODER_URNS =
       ImmutableSet.of(
diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index f0e6b74de69..220f4fcffb9 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -56,6 +56,7 @@ dependencies {
   compile project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
   compile project(path: ":beam-runners-core-java", configuration: "shadow")
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
+  compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.joda_time
   shadow library.java.findbugs_jsr305
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index a2bdb32011e..e40e4b35fa7 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -204,6 +204,11 @@
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-java-fn-execution</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
index 5ee61dfcbea..48d13487226 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactory.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.runners.direct.portable;
 
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.direct.ExecutableGraph;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -27,10 +31,13 @@
  * {@link PTransform}.
  */
 class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
-  private final EvaluationContext evaluationContext;
+  private final BundleFactory bundleFactory;
+  private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
 
-  FlattenEvaluatorFactory(EvaluationContext evaluationContext) {
-    this.evaluationContext = evaluationContext;
+  FlattenEvaluatorFactory(
+      BundleFactory bundleFactory, ExecutableGraph<PTransformNode, 
PCollectionNode> graph) {
+    this.bundleFactory = bundleFactory;
+    this.graph = graph;
   }
 
   @Override
@@ -42,31 +49,31 @@
   }
 
   @Override
-  public void cleanup() throws Exception {}
+  public void cleanup() {}
 
   private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
-      final PTransformNode application) {
-    throw new UnsupportedOperationException("Not yet implemented");
+      final PTransformNode transform) {
+    return new FlattenEvaluator<>(transform);
   }
 
-  private static class FlattenEvaluator<InputT> implements 
TransformEvaluator<InputT> {
-    private final UncommittedBundle<InputT> outputBundle;
-    private final TransformResult<InputT> result;
+  private class FlattenEvaluator<InputT> implements TransformEvaluator<InputT> 
{
+    private final PTransformNode transform;
+    private final UncommittedBundle<InputT> bundle;
 
-    public FlattenEvaluator(
-        UncommittedBundle<InputT> outputBundle, TransformResult<InputT> 
result) {
-      this.outputBundle = outputBundle;
-      this.result = result;
+    FlattenEvaluator(PTransformNode transform) {
+      this.transform = transform;
+      PCollectionNode output = getOnlyElement(graph.getProduced(transform));
+      bundle = bundleFactory.createBundle(output);
     }
 
     @Override
     public void processElement(WindowedValue<InputT> element) {
-      outputBundle.add(element);
+      bundle.add(element);
     }
 
     @Override
     public TransformResult<InputT> finishBundle() {
-      return result;
+      return 
StepTransformResult.<InputT>withoutHold(transform).addOutput(bundle).build();
     }
   }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
index ab1538e1c7f..2a9e2582474 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactory.java
@@ -17,41 +17,56 @@
  */
 package org.apache.beam.runners.direct.portable;
 
-import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.getOnlyElement;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
-import org.apache.beam.runners.direct.portable.StepTransformResult.Builder;
+import org.apache.beam.runners.direct.ExecutableGraph;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 
 /**
- * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link GroupByKeyOnly} {@link PTransform}.
+ * The {@code DirectRunner} {@link TransformEvaluatorFactory} for the {@link 
GroupByKeyOnly} {@link
+ * PTransform}.
  */
 class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
-  private final EvaluationContext evaluationContext;
+  private final Components components;
 
-  GroupByKeyOnlyEvaluatorFactory(EvaluationContext evaluationContext) {
-    this.evaluationContext = evaluationContext;
+  private final BundleFactory bundleFactory;
+  private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
+
+  GroupByKeyOnlyEvaluatorFactory(
+      Components components,
+      BundleFactory bundleFactory,
+      ExecutableGraph<PTransformNode, PCollectionNode> graph) {
+    this.components = components;
+    this.bundleFactory = bundleFactory;
+    this.graph = graph;
   }
 
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
-      PTransformNode application,
-      CommittedBundle<?> inputBundle) {
+      PTransformNode application, CommittedBundle<?> inputBundle) {
     @SuppressWarnings({"cast", "unchecked", "rawtypes"})
     TransformEvaluator<InputT> evaluator = (TransformEvaluator) 
createEvaluator(application);
     return evaluator;
@@ -61,7 +76,7 @@
   public void cleanup() {}
 
   private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(final 
PTransformNode application) {
-    return new GroupByKeyOnlyEvaluator<>(evaluationContext, application);
+    return new GroupByKeyOnlyEvaluator<>(application);
   }
 
   /**
@@ -70,35 +85,52 @@ public void cleanup() {}
    *
    * @see GroupByKeyViaGroupByKeyOnly
    */
-  private static class GroupByKeyOnlyEvaluator<K, V>
-      implements TransformEvaluator<KV<K, V>> {
-    private final EvaluationContext evaluationContext;
-
-    private final PTransformNode application;
+  private class GroupByKeyOnlyEvaluator<K, V> implements 
TransformEvaluator<KV<K, V>> {
     private final Coder<K> keyCoder;
-    private Map<StructuralKey<K>, List<WindowedValue<V>>> groupingMap;
-
-    public GroupByKeyOnlyEvaluator(
-        EvaluationContext evaluationContext,
-        PTransformNode application) {
-      this.evaluationContext = evaluationContext;
-      this.application = application;
-      this.keyCoder = null;
-      this.groupingMap = new HashMap<>();
-      throw new UnsupportedOperationException("Not yet migrated");
+    private final Map<StructuralKey<K>, List<WindowedValue<V>>> groupingMap;
+
+    private final PCollectionNode outputPCollection;
+    private final StepTransformResult.Builder<KV<K, V>> resultBuilder;
+
+    private GroupByKeyOnlyEvaluator(PTransformNode application) {
+      keyCoder = getKeyCoder(application);
+      groupingMap = new HashMap<>();
+      outputPCollection = getOnlyElement(graph.getProduced(application));
+      resultBuilder = StepTransformResult.withoutHold(application);
     }
 
-    private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
-      checkState(
-          coder instanceof KvCoder,
-          "%s requires a coder of class %s."
-              + " This is an internal error; this is checked during pipeline 
construction"
-              + " but became corrupted.",
-          getClass().getSimpleName(),
-          KvCoder.class.getSimpleName());
-      @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, V>) coder).getKeyCoder();
-      return keyCoder;
+    private Coder<K> getKeyCoder(PTransformNode application) {
+      PCollectionNode inputPCollection = 
getOnlyElement(graph.getPerElementInputs(application));
+      try {
+        // We know the type restrictions on the input PCollection, and the 
restrictions on the
+        // Wire coder
+        MessageWithComponents wireCoderProto =
+            WireCoders.createRunnerWireCoder(
+                inputPCollection, components, components::containsCoders);
+        Coder<WindowedValue<KV<K, V>>> wireCoder =
+            (Coder<WindowedValue<KV<K, V>>>)
+                CoderTranslation.fromProto(
+                    wireCoderProto.getCoder(),
+                    
RehydratedComponents.forComponents(wireCoderProto.getComponents()));
+
+        checkArgument(
+            wireCoder instanceof WindowedValue.WindowedValueCoder,
+            "Wire %s must be a %s",
+            Coder.class.getSimpleName(),
+            WindowedValueCoder.class.getSimpleName());
+        WindowedValueCoder<KV<K, V>> windowedValueCoder = 
(WindowedValueCoder<KV<K, V>>) wireCoder;
+
+        checkArgument(
+            windowedValueCoder.getValueCoder() instanceof KvCoder,
+            "Input elements to %s must be encoded with a %s",
+            DirectGroupByKey.DirectGroupByKeyOnly.class.getSimpleName(),
+            KvCoder.class.getSimpleName());
+        KvCoder<K, V> kvCoder = (KvCoder<K, V>) 
windowedValueCoder.getValueCoder();
+
+        return kvCoder.getKeyCoder();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -113,19 +145,17 @@ public void processElement(WindowedValue<KV<K, V>> 
element) {
 
     @Override
     public TransformResult<KV<K, V>> finishBundle() {
-      Builder resultBuilder = StepTransformResult.withoutHold(application);
       for (Map.Entry<StructuralKey<K>, List<WindowedValue<V>>> groupedEntry :
           groupingMap.entrySet()) {
         K key = groupedEntry.getKey().getKey();
         KeyedWorkItem<K, V> groupedKv =
             KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        PCollectionNode outputNode = null;
         UncommittedBundle<KeyedWorkItem<K, V>> bundle =
-            evaluationContext.createKeyedBundle(StructuralKey.of(key, 
keyCoder), outputNode);
+            bundleFactory.createKeyedBundle(StructuralKey.of(key, keyCoder), 
outputPCollection);
         bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
         resultBuilder.addOutput(bundle);
       }
-      throw new UnsupportedOperationException("Not yet migrated");
+      return resultBuilder.build();
     }
   }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
index 4f00de1a8a4..b5df4b21fea 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactory.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.direct.portable;
 
+import static com.google.common.collect.Iterables.getOnlyElement;
+
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Collections;
 import javax.annotation.Nullable;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.direct.ExecutableGraph;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -30,17 +33,22 @@
 
 /** The evaluator for the {@link Impulse} transform. Produces only empty byte 
arrays. */
 class ImpulseEvaluatorFactory implements TransformEvaluatorFactory {
-  private final EvaluationContext ctxt;
+  private final BundleFactory bundleFactory;
+  private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
 
-  ImpulseEvaluatorFactory(EvaluationContext ctxt) {
-    this.ctxt = ctxt;
+  ImpulseEvaluatorFactory(
+      BundleFactory bundleFactory, ExecutableGraph<PTransformNode, 
PCollectionNode> graph) {
+    this.bundleFactory = bundleFactory;
+    this.graph = graph;
   }
 
   @Nullable
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       PTransformNode application, CommittedBundle<?> inputBundle) {
-    return (TransformEvaluator<InputT>) new ImpulseEvaluator(ctxt,  
application);
+    return (TransformEvaluator<InputT>)
+        new ImpulseEvaluator(
+            bundleFactory, application, 
getOnlyElement(graph.getProduced(application)));
   }
 
   @Override
@@ -49,22 +57,24 @@ public void cleanup() {
   }
 
   private static class ImpulseEvaluator implements 
TransformEvaluator<ImpulseShard> {
-    private final EvaluationContext ctxt;
-    private final PTransformNode transform;
     private final StepTransformResult.Builder<ImpulseShard> result;
 
-    private ImpulseEvaluator(EvaluationContext ctxt, PTransformNode transform) 
{
-      this.ctxt = ctxt;
-      this.transform = transform;
-      this.result = StepTransformResult.withoutHold(transform);
+    private final BundleFactory factory;
+    private final PCollectionNode outputPCollection;
+
+    private ImpulseEvaluator(
+        BundleFactory factory, PTransformNode application, PCollectionNode 
outputPCollection) {
+      this.factory = factory;
+      result = StepTransformResult.withoutHold(application);
+      this.outputPCollection = outputPCollection;
     }
 
     @Override
     public void processElement(WindowedValue<ImpulseShard> element) throws 
Exception {
-      PCollectionNode outputPCollection = null;
       result.addOutput(
-          
ctxt.createBundle(outputPCollection).add(WindowedValue.valueInGlobalWindow(new 
byte[0])));
-      throw new UnsupportedOperationException("Not yet migrated");
+          factory
+              .createBundle(outputPCollection)
+              .add(WindowedValue.valueInGlobalWindow(new byte[0])));
     }
 
     @Override
@@ -78,17 +88,17 @@ public void processElement(WindowedValue<ImpulseShard> 
element) throws Exception
    * {@link ImpulseShard}.
    */
   static class ImpulseRootProvider implements RootInputProvider<ImpulseShard> {
-    private final EvaluationContext ctxt;
+    private final BundleFactory bundleFactory;
 
-    ImpulseRootProvider(EvaluationContext ctxt) {
-      this.ctxt = ctxt;
+    ImpulseRootProvider(BundleFactory bundleFactory) {
+      this.bundleFactory = bundleFactory;
     }
 
     @Override
     public Collection<CommittedBundle<ImpulseShard>> getInitialInputs(
         PTransformNode transform, int targetParallelism) {
       return Collections.singleton(
-          ctxt.<ImpulseShard>createRootBundle()
+          bundleFactory.<ImpulseShard>createRootBundle()
               .add(WindowedValue.valueInGlobalWindow(new ImpulseShard()))
               .commit(BoundedWindow.TIMESTAMP_MIN_VALUE));
     }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
index 2834db79a0f..e9702ba34fe 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RootProviderRegistry.java
@@ -34,10 +34,12 @@
  */
 class RootProviderRegistry {
   /** Returns a {@link RootProviderRegistry} that only supports the {@link 
Impulse} primitive. */
-  public static RootProviderRegistry impulseRegistry(EvaluationContext 
context) {
+  public static RootProviderRegistry impulseRegistry(BundleFactory 
bundleFactory) {
     return new RootProviderRegistry(
         ImmutableMap.<String, RootInputProvider<?>>builder()
-            .put(IMPULSE_TRANSFORM_URN, new 
ImpulseEvaluatorFactory.ImpulseRootProvider(context))
+            .put(
+                IMPULSE_TRANSFORM_URN,
+                new ImpulseEvaluatorFactory.ImpulseRootProvider(bundleFactory))
             .build());
   }
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
index f3177cdf8ab..306132346d3 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/FlattenEvaluatorFactoryTest.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.runners.direct.portable;
 
+import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
@@ -29,13 +29,9 @@
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
-import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.hamcrest.Matchers;
 import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -45,46 +41,57 @@
 public class FlattenEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  @Ignore("TODO: BEAM-4240 Enable when the Flatten Evaluator Factory is fully 
migrated")
   @Test
   public void testFlattenInMemoryEvaluator() throws Exception {
     PCollectionNode left =
         PipelineNode.pCollection("left", 
PCollection.newBuilder().setUniqueName("left").build());
     PCollectionNode right =
         PipelineNode.pCollection("right", 
PCollection.newBuilder().setUniqueName("right").build());
+    // Include a root node for a sane-looking graph
+    PTransformNode source =
+        PipelineNode.pTransform(
+            "source",
+            PTransform.newBuilder()
+                .putOutputs("left", left.getId())
+                .putOutputs("right", right.getId())
+                .build());
 
+    PCollectionNode flattened =
+        PipelineNode.pCollection("flat", 
PCollection.newBuilder().setUniqueName("flat").build());
     PTransformNode flatten =
         PipelineNode.pTransform(
             "flatten",
             PTransform.newBuilder()
                 .setUniqueName("flatten")
+                .putInputs("left", left.getId())
+                .putInputs("right", right.getId())
+                .putOutputs("out", flattened.getId())
                 .setSpec(
                     
FunctionSpec.newBuilder().setUrn(PTransformTranslation.FLATTEN_TRANSFORM_URN))
                 .build());
 
-    PCollectionNode flattened =
-        PipelineNode.pCollection("flat", 
PCollection.newBuilder().setUniqueName("flat").build());
+    PortableGraph graph =
+        PortableGraph.forPipeline(
+            RunnerApi.Pipeline.newBuilder()
+                .addRootTransformIds(source.getId())
+                .addRootTransformIds(flatten.getId())
+                .setComponents(
+                    RunnerApi.Components.newBuilder()
+                        .putTransforms(source.getId(), source.getTransform())
+                        .putPcollections(left.getId(), left.getPCollection())
+                        .putPcollections(right.getId(), right.getPCollection())
+                        .putTransforms(flatten.getId(), flatten.getTransform())
+                        .putPcollections(flattened.getId(), 
flattened.getPCollection()))
+                .build());
 
     CommittedBundle<Integer> leftBundle =
         bundleFactory.<Integer>createBundle(left).commit(Instant.now());
     CommittedBundle<Integer> rightBundle =
         bundleFactory.<Integer>createBundle(right).commit(Instant.now());
 
-    EvaluationContext context = mock(EvaluationContext.class);
-
-    UncommittedBundle<Integer> flattenedLeftBundle = 
bundleFactory.createBundle(flattened);
-    UncommittedBundle<Integer> flattenedRightBundle = 
bundleFactory.createBundle(flattened);
-
-    when(context.<Integer>createBundle(flattened))
-        .thenReturn(flattenedLeftBundle, flattenedRightBundle);
-
-    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context);
-    TransformEvaluator<Integer> leftSideEvaluator =
-        factory.forApplication(flatten, leftBundle);
-    TransformEvaluator<Integer> rightSideEvaluator =
-        factory.forApplication(flatten, rightBundle);
+    FlattenEvaluatorFactory factory = new 
FlattenEvaluatorFactory(bundleFactory, graph);
+    TransformEvaluator<Integer> leftSideEvaluator = 
factory.forApplication(flatten, leftBundle);
+    TransformEvaluator<Integer> rightSideEvaluator = 
factory.forApplication(flatten, rightBundle);
 
     leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1));
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
@@ -99,19 +106,14 @@ public void testFlattenInMemoryEvaluator() throws 
Exception {
     TransformResult<Integer> rightSideResult = 
rightSideEvaluator.finishBundle();
     TransformResult<Integer> leftSideResult = leftSideEvaluator.finishBundle();
 
-    assertThat(rightSideResult.getOutputBundles(), 
Matchers.contains(flattenedRightBundle));
-    assertThat(rightSideResult.getTransform(), Matchers.equalTo(flatten));
-    assertThat(leftSideResult.getOutputBundles(), 
Matchers.contains(flattenedLeftBundle));
-    assertThat(leftSideResult.getTransform(), Matchers.equalTo(flatten));
-
     assertThat(
-        flattenedLeftBundle.commit(Instant.now()).getElements(),
+        
getOnlyElement(leftSideResult.getOutputBundles()).commit(Instant.now()),
         containsInAnyOrder(
             WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)),
             WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING),
             WindowedValue.valueInGlobalWindow(1)));
     assertThat(
-        flattenedRightBundle.commit(Instant.now()).getElements(),
+        
getOnlyElement(rightSideResult.getOutputBundles()).commit(Instant.now()),
         containsInAnyOrder(
             WindowedValue.valueInGlobalWindow(2, 
PaneInfo.ON_TIME_AND_ONLY_FIRING),
             WindowedValue.timestampedValueInGlobalWindow(-4, new 
Instant(-4096)),
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
index 64541254f16..174fe1d56a7 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -19,33 +19,36 @@
 
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multiset;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
 import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.joda.time.Instant;
-import org.junit.Ignore;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -55,31 +58,53 @@
 public class GroupByKeyOnlyEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
   @Test
-  @Ignore("TODO: BEAM-4240 Not yet migrated")
   public void testInMemoryEvaluator() throws Exception {
-    KV<String, Integer> firstFoo = KV.of("foo", -1);
-    KV<String, Integer> secondFoo = KV.of("foo", 1);
-    KV<String, Integer> thirdFoo = KV.of("foo", 3);
-    KV<String, Integer> firstBar = KV.of("bar", 22);
-    KV<String, Integer> secondBar = KV.of("bar", 12);
-    KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
+    KvCoder<String, Integer> javaCoder = KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of());
+    SdkComponents components = SdkComponents.create();
+    String windowingStrategyId =
+        
components.registerWindowingStrategy(WindowingStrategy.globalDefault());
+    String coderId = components.registerCoder(javaCoder);
+    MessageWithComponents javaWireCoderAndComponents =
+        LengthPrefixUnknownCoders.forCoder(coderId, components.toComponents(), 
false);
+    Coder<KV<String, Integer>> javaWireCoder =
+        (Coder<KV<String, Integer>>)
+            CoderTranslation.fromProto(
+                javaWireCoderAndComponents.getCoder(),
+                
RehydratedComponents.forComponents(javaWireCoderAndComponents.getComponents()));
+
+    MessageWithComponents runnerWireCoderAndComponents =
+        LengthPrefixUnknownCoders.forCoder(coderId, components.toComponents(), 
true);
+    Coder<KV<?, ?>> runnerWireCoder =
+        (Coder<KV<?, ?>>)
+            CoderTranslation.fromProto(
+                runnerWireCoderAndComponents.getCoder(),
+                
RehydratedComponents.forComponents(runnerWireCoderAndComponents.getComponents()));
 
-    KvCoder<String, Integer> kvCoder =
-        KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of());
+    KV<?, ?> firstFoo = asRunnerKV(javaWireCoder, runnerWireCoder, 
KV.of("foo", -1));
+    KV<?, ?> secondFoo = asRunnerKV(javaWireCoder, runnerWireCoder, 
KV.of("foo", 1));
+    KV<?, ?> thirdFoo = asRunnerKV(javaWireCoder, runnerWireCoder, 
KV.of("foo", 3));
+    KV<?, ?> firstBar = asRunnerKV(javaWireCoder, runnerWireCoder, 
KV.of("bar", 22));
+    KV<?, ?> secondBar = asRunnerKV(javaWireCoder, runnerWireCoder, 
KV.of("bar", 12));
+    KV<?, ?> firstBaz = asRunnerKV(javaWireCoder, runnerWireCoder, 
KV.of("baz", Integer.MAX_VALUE));
 
+    PTransformNode inputTransform =
+        PipelineNode.pTransform(
+            "source", PTransform.newBuilder().putOutputs("out", 
"values").build());
     PCollectionNode values =
         PipelineNode.pCollection(
             "values",
             RunnerApi.PCollection.newBuilder()
                 .setUniqueName("values")
-                .setCoderId("kvCoder")
+                .setCoderId(coderId)
+                .setWindowingStrategyId(windowingStrategyId)
                 .build());
     PCollectionNode groupedKvs =
         PipelineNode.pCollection(
-            "groupedKvs", 
RunnerApi.PCollection.newBuilder().setUniqueName("groupedKvs").build());
+            "groupedKvs",
+            RunnerApi.PCollection.newBuilder()
+                .setUniqueName("groupedKvs")
+                .build());
     PTransformNode groupByKeyOnly =
         PipelineNode.pTransform(
             "gbko",
@@ -88,36 +113,27 @@ public void testInMemoryEvaluator() throws Exception {
                 .putOutputs("output", "groupedKvs")
                 
.setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GBKO_URN).build())
                 .build());
+    Pipeline pipeline =
+        Pipeline.newBuilder()
+            .addRootTransformIds(inputTransform.getId())
+            .addRootTransformIds(groupByKeyOnly.getId())
+            .setComponents(
+                components
+                    .toComponents()
+                    .toBuilder()
+                    .putTransforms(inputTransform.getId(), 
inputTransform.getTransform())
+                    .putTransforms(groupByKeyOnly.getId(), 
groupByKeyOnly.getTransform())
+                    .putPcollections(values.getId(), values.getPCollection())
+                    .putPcollections(groupedKvs.getId(), 
groupedKvs.getPCollection()))
+            .build();
+
+    PortableGraph graph = PortableGraph.forPipeline(pipeline);
 
     CommittedBundle<KV<String, Integer>> inputBundle =
         bundleFactory.<KV<String, 
Integer>>createBundle(values).commit(Instant.now());
-    EvaluationContext evaluationContext = mock(EvaluationContext.class);
-
-    StructuralKey<String> fooKey = StructuralKey.of("foo", 
StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        bundleFactory.createKeyedBundle(fooKey, groupedKvs);
-    StructuralKey<String> barKey = StructuralKey.of("bar", 
StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        bundleFactory.createKeyedBundle(barKey, groupedKvs);
-    StructuralKey<String> bazKey = StructuralKey.of("baz", 
StringUtf8Coder.of());
-    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        bundleFactory.createKeyedBundle(bazKey, groupedKvs);
-
-    when(evaluationContext.<String, KeyedWorkItem<String, 
Integer>>createKeyedBundle(
-            fooKey, groupedKvs))
-        .thenReturn(fooBundle);
-    when(evaluationContext.<String, KeyedWorkItem<String, 
Integer>>createKeyedBundle(
-            barKey, groupedKvs))
-        .thenReturn(barBundle);
-    when(evaluationContext.<String, KeyedWorkItem<String, 
Integer>>createKeyedBundle(
-            bazKey, groupedKvs))
-        .thenReturn(bazBundle);
 
-    // The input to a GroupByKey is assumed to be a KvCoder
-    @SuppressWarnings("unchecked")
-    Coder<String> keyCoder = kvCoder.getKeyCoder();
-    TransformEvaluator<KV<String, Integer>> evaluator =
-        new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
+    TransformEvaluator<KV<?, ?>> evaluator =
+        new GroupByKeyOnlyEvaluatorFactory(pipeline.getComponents(), 
bundleFactory, graph)
             .forApplication(groupByKeyOnly, inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
@@ -127,36 +143,68 @@ public void testInMemoryEvaluator() throws Exception {
     evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
 
-    evaluator.finishBundle();
+    TransformResult<KV<?, ?>> result = evaluator.finishBundle();
 
+    // The input to a GroupByKey is assumed to be a KvCoder
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    Coder runnerKeyCoder = ((KvCoder) runnerWireCoder).getKeyCoder();
+    CommittedBundle<?> fooBundle = null;
+    CommittedBundle<?> barBundle = null;
+    CommittedBundle<?> bazBundle = null;
+    StructuralKey fooKey = StructuralKey.of(firstFoo.getKey(), runnerKeyCoder);
+    StructuralKey barKey = StructuralKey.of(firstBar.getKey(), runnerKeyCoder);
+    StructuralKey bazKey = StructuralKey.of(firstBaz.getKey(), runnerKeyCoder);
+    for (UncommittedBundle<?> groupedBundle : result.getOutputBundles()) {
+      CommittedBundle<?> groupedCommitted = 
groupedBundle.commit(Instant.now());
+      if (fooKey.equals(groupedCommitted.getKey())) {
+        fooBundle = groupedCommitted;
+      } else if (barKey.equals(groupedCommitted.getKey())) {
+        barBundle = groupedCommitted;
+      } else if (bazKey.equals(groupedCommitted.getKey())) {
+        bazBundle = groupedCommitted;
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Unknown Key %s", groupedCommitted.getKey()));
+      }
+    }
     assertThat(
-        fooBundle.commit(Instant.now()).getElements(),
+        fooBundle,
         contains(
-            new KeyedWorkItemMatcher<>(
+            new KeyedWorkItemMatcher(
                 KeyedWorkItems.elementsWorkItem(
-                    "foo",
+                    fooKey.getKey(),
                     ImmutableSet.of(
-                        WindowedValue.valueInGlobalWindow(-1),
-                        WindowedValue.valueInGlobalWindow(1),
-                        WindowedValue.valueInGlobalWindow(3))),
-                keyCoder)));
+                        WindowedValue.valueInGlobalWindow(firstFoo.getValue()),
+                        
WindowedValue.valueInGlobalWindow(secondFoo.getValue()),
+                        
WindowedValue.valueInGlobalWindow(thirdFoo.getValue()))),
+                runnerKeyCoder)));
     assertThat(
-        barBundle.commit(Instant.now()).getElements(),
+        barBundle,
         contains(
             new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
-                    "bar",
+                    barKey.getKey(),
                     ImmutableSet.of(
-                        WindowedValue.valueInGlobalWindow(12),
-                        WindowedValue.valueInGlobalWindow(22))),
-                keyCoder)));
+                        WindowedValue.valueInGlobalWindow(firstBar.getValue()),
+                        
WindowedValue.valueInGlobalWindow(secondBar.getValue()))),
+                runnerKeyCoder)));
     assertThat(
-        bazBundle.commit(Instant.now()).getElements(),
+        bazBundle,
         contains(
             new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
-                    "baz", 
ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
-                keyCoder)));
+                    bazKey.getKey(),
+                    
ImmutableSet.of(WindowedValue.valueInGlobalWindow(firstBaz.getValue()))),
+                runnerKeyCoder)));
+  }
+
+  private KV<?, ?> asRunnerKV(
+      Coder<KV<String, Integer>> javaWireCoder,
+      Coder<KV<?, ?>> runnerWireCoder,
+      KV<String, Integer> value)
+      throws org.apache.beam.sdk.coders.CoderException {
+    return CoderUtils.decodeFromByteArray(
+        runnerWireCoder, CoderUtils.encodeToByteArray(javaWireCoder, value));
   }
 
   private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
index 7d0fc53f9bd..591603027fb 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ImpulseEvaluatorFactoryTest.java
@@ -23,17 +23,18 @@
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Iterables;
 import java.util.Collection;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.direct.ExecutableGraph;
 import 
org.apache.beam.runners.direct.portable.ImpulseEvaluatorFactory.ImpulseRootProvider;
 import 
org.apache.beam.runners.direct.portable.ImpulseEvaluatorFactory.ImpulseShard;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -41,16 +42,13 @@
 import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link ImpulseEvaluatorFactory}. */
 @RunWith(JUnit4.class)
-@Ignore("Not yet migrated")
 public class ImpulseEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
@@ -62,9 +60,20 @@
                   FunctionSpec.newBuilder()
                       .setUrn(PTransformTranslation.IMPULSE_TRANSFORM_URN)
                       .build())
+              .putOutputs("output", "impulse.out")
+              .build());
+  private PCollectionNode impulseOut =
+      PipelineNode.pCollection(
+          "impulse.out", 
RunnerApi.PCollection.newBuilder().setUniqueName("impulse.out").build());
+  private ExecutableGraph<PTransformNode, PCollectionNode> graph =
+      PortableGraph.forPipeline(
+          RunnerApi.Pipeline.newBuilder()
+              .addRootTransformIds("impulse")
+              .setComponents(
+                  Components.newBuilder()
+                      .putTransforms("impulse", 
impulseApplication.getTransform())
+                      .putPcollections("impulse.out", 
impulseOut.getPCollection()))
               .build());
-
-  @Mock private EvaluationContext context;
 
   @Before
   public void setup() {
@@ -73,17 +82,13 @@ public void setup() {
 
   @Test
   public void testImpulse() throws Exception {
-    PCollectionNode impulseOut =
-        PipelineNode.pCollection(
-            "impulse.out", 
RunnerApi.PCollection.newBuilder().setUniqueName("impulse.out").build());
 
-    ImpulseEvaluatorFactory factory = new ImpulseEvaluatorFactory(context);
+    ImpulseEvaluatorFactory factory = new 
ImpulseEvaluatorFactory(bundleFactory, graph);
 
     WindowedValue<ImpulseShard> inputShard = 
WindowedValue.valueInGlobalWindow(new ImpulseShard());
     CommittedBundle<ImpulseShard> inputShardBundle =
         
bundleFactory.<ImpulseShard>createRootBundle().add(inputShard).commit(Instant.now());
 
-    
when(context.createBundle(impulseOut)).thenReturn(bundleFactory.createBundle(impulseOut));
     TransformEvaluator<ImpulseShard> evaluator =
         factory.forApplication(impulseApplication, inputShardBundle);
     evaluator.processElement(inputShard);
@@ -112,8 +117,7 @@ public void testImpulse() throws Exception {
 
   @Test
   public void testRootProvider() {
-    ImpulseRootProvider rootProvider = new ImpulseRootProvider(context);
-    
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
+    ImpulseRootProvider rootProvider = new ImpulseRootProvider(bundleFactory);
 
     Collection<? extends CommittedBundle<?>> inputs =
         rootProvider.getInitialInputs(impulseApplication, 100);
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
index 34a17e41b45..b8405f4eb02 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java
@@ -16,15 +16,12 @@
 
 package org.apache.beam.runners.fnexecution.wire;
 
-import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
-
 import com.google.common.collect.Sets;
 import java.util.Set;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
-import org.apache.beam.model.pipeline.v1.RunnerApi.StandardCoders;
 import org.apache.beam.runners.core.construction.ModelCoders;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
@@ -33,9 +30,6 @@
  * Utilities for replacing or wrapping unknown coders with {@link 
LengthPrefixCoder}.
  */
 public class LengthPrefixUnknownCoders {
-  private static final String BYTES_CODER_TYPE = ModelCoders.BYTES_CODER_URN;
-  private static final String LENGTH_PREFIX_CODER_TYPE = 
ModelCoders.LENGTH_PREFIX_CODER_URN;
-
   /**
    * Recursively traverse the coder tree and wrap the first unknown coder in 
every branch with a
    * {@link LengthPrefixCoder} unless an ancestor coder is itself a {@link 
LengthPrefixCoder}. If
@@ -67,8 +61,7 @@
     //     rebuild the coder by recursively length prefixing any unknown 
component coders.
     //  3) the requested coder is an unknown coder. In this case we either 
wrap the requested coder
     //     with a length prefix coder or replace it with a length prefix byte 
array coder.
-    if (getUrn(StandardCoders.Enum.LENGTH_PREFIX)
-        .equals(currentCoder.getSpec().getSpec().getUrn())) {
+    if 
(ModelCoders.LENGTH_PREFIX_CODER_URN.equals(currentCoder.getSpec().getSpec().getUrn()))
 {
       if (replaceWithByteArrayCoder) {
         return createLengthPrefixByteArrayCoder(coderId, components);
       }
@@ -132,11 +125,12 @@ private static MessageWithComponents 
lengthPrefixUnknownCoder(
       rvalBuilder.getComponentsBuilder().putCoders(coderId, currentCoder);
     }
 
-    rvalBuilder.getCoderBuilder()
+    rvalBuilder
+        .getCoderBuilder()
         .addComponentCoderIds(lengthPrefixComponentCoderId)
         .getSpecBuilder()
         .getSpecBuilder()
-        .setUrn(getUrn(StandardCoders.Enum.LENGTH_PREFIX));
+        .setUrn(ModelCoders.LENGTH_PREFIX_CODER_URN);
     return rvalBuilder.build();
   }
 
@@ -153,14 +147,14 @@ private static MessageWithComponents 
createLengthPrefixByteArrayCoder(
     byteArrayCoder
         .getSpecBuilder()
         .getSpecBuilder()
-        .setUrn(getUrn(StandardCoders.Enum.BYTES));
+        .setUrn(ModelCoders.BYTES_CODER_URN);
     rvalBuilder.getComponentsBuilder().putCoders(byteArrayCoderId,
         byteArrayCoder.build());
     rvalBuilder.getCoderBuilder()
         .addComponentCoderIds(byteArrayCoderId)
         .getSpecBuilder()
         .getSpecBuilder()
-        .setUrn(getUrn(StandardCoders.Enum.LENGTH_PREFIX));
+        .setUrn(ModelCoders.LENGTH_PREFIX_CODER_URN);
 
     return rvalBuilder.build();
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 102281)
    Time Spent: 4h 50m  (was: 4h 40m)

> The DirectRunner should interact with a Pipeline via an abstraction of the 
> Graph rather than SDK types
> ------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4073
>                 URL: https://issues.apache.org/jira/browse/BEAM-4073
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to