[ 
https://issues.apache.org/jira/browse/BEAM-2899?focusedWorklogId=108184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-108184
 ]

ASF GitHub Bot logged work on BEAM-2899:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Jun/18 20:15
            Start Date: 01/Jun/18 20:15
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5494: 
[BEAM-2899][BEAM-3329][BEAM-3328] Link up the Portable DirectRunner
URL: https://github.com/apache/beam/pull/5494
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java
deleted file mode 100644
index c27c762a8eb..00000000000
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectExecutionContext.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.direct.portable;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import org.apache.beam.runners.core.StepContext;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.direct.Clock;
-import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
-import org.apache.beam.runners.local.StructuralKey;
-
-/**
- * Execution Context for the {@code DirectRunner}.
- *
- * <p>This implementation is not thread safe. A new {@link 
DirectExecutionContext} must be created
- * for each thread that requires it.
- */
-class DirectExecutionContext {
-  private final Clock clock;
-  private final StructuralKey<?> key;
-  private final CopyOnAccessInMemoryStateInternals existingState;
-  private final TransformWatermarks watermarks;
-  private Map<String, DirectStepContext> cachedStepContexts = new 
LinkedHashMap<>();
-
-  public DirectExecutionContext(
-      Clock clock,
-      StructuralKey<?> key,
-      CopyOnAccessInMemoryStateInternals existingState,
-      TransformWatermarks watermarks) {
-    this.clock = clock;
-    this.key = key;
-    this.existingState = existingState;
-    this.watermarks = watermarks;
-  }
-
-  private DirectStepContext createStepContext() {
-    return new DirectStepContext();
-  }
-
-  /**
-   * Returns the {@link StepContext} associated with the given step.
-   */
-  public DirectStepContext getStepContext(String stepName) {
-    return cachedStepContexts.computeIfAbsent(stepName, k -> 
createStepContext());
-  }
-
-  /**
-   * Step Context for the {@code DirectRunner}.
-   */
-  public class DirectStepContext implements StepContext {
-    private CopyOnAccessInMemoryStateInternals<?> stateInternals;
-    private DirectTimerInternals timerInternals;
-
-    public DirectStepContext() { }
-
-    @Override
-    public CopyOnAccessInMemoryStateInternals<?> stateInternals() {
-      if (stateInternals == null) {
-        stateInternals = 
CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
-      }
-      return stateInternals;
-    }
-
-    @Override
-    public DirectTimerInternals timerInternals() {
-      if (timerInternals == null) {
-        timerInternals =
-            DirectTimerInternals.create(clock, watermarks, 
TimerUpdate.builder(key));
-      }
-      return timerInternals;
-    }
-
-    /**
-     * Commits the state of this step, and returns the committed state. If the 
step has not
-     * accessed any state, return null.
-     */
-    public CopyOnAccessInMemoryStateInternals commitState() {
-      if (stateInternals != null) {
-        return stateInternals.commit();
-      }
-      return null;
-    }
-
-    /**
-     * Gets the timer update of the {@link TimerInternals} of this {@link 
DirectStepContext},
-     * which is empty if the {@link TimerInternals} were never accessed.
-     */
-    public TimerUpdate getTimerUpdate() {
-      if (timerInternals == null) {
-        return TimerUpdate.empty();
-      }
-      return timerInternals.getTimerUpdate();
-    }
-  }
-}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
index 98094c7087f..2eaf0ec990a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
@@ -24,7 +24,6 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
@@ -85,8 +84,6 @@ private DirectJobBundleFactory(
         stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
   }
 
-  private final AtomicLong idgen = new AtomicLong();
-
   private <T> StageBundleFactory<T> createBundleFactory(ExecutableStage stage) 
{
     RemoteEnvironment remoteEnv =
         environments.computeIfAbsent(
@@ -100,12 +97,13 @@ private DirectJobBundleFactory(
             });
     SdkHarnessClient sdkHarnessClient =
         SdkHarnessClient.usingFnApiClient(
-            remoteEnv.getInstructionRequestHandler(), 
dataService.getService());
+                remoteEnv.getInstructionRequestHandler(), 
dataService.getService())
+            .withIdGenerator(idGenerator);
     ExecutableProcessBundleDescriptor descriptor;
     try {
       descriptor =
           ProcessBundleDescriptors.fromExecutableStage(
-              Long.toString(idgen.getAndIncrement()), stage, 
dataService.getApiServiceDescriptor());
+              idGenerator.getId(), stage, 
dataService.getApiServiceDescriptor());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -151,8 +149,7 @@ private DirectStageBundleFactory(
 
     @Override
     public RemoteBundle<T> getBundle(
-        OutputReceiverFactory outputReceiverFactory, StateRequestHandler 
stateRequestHandler)
-        throws Exception {
+        OutputReceiverFactory outputReceiverFactory, StateRequestHandler 
stateRequestHandler) {
       Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
       for (Map.Entry<Target, Coder<WindowedValue<?>>> targetCoders :
           descriptor.getOutputTargetCoders().entrySet()) {
@@ -167,7 +164,7 @@ private DirectStageBundleFactory(
             outputReceiverFactory.create(bundleOutputPCollection);
         outputReceivers.put(
             targetCoders.getKey(),
-            RemoteOutputReceiver.of(targetCoders.getValue(), outputReceiver));
+            RemoteOutputReceiver.of((Coder) targetCoders.getValue(), 
outputReceiver));
       }
       return processor.newBundle(outputReceivers, stateRequestHandler);
     }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectStateAndTimers.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectStateAndTimers.java
new file mode 100644
index 00000000000..3f5060b6a8a
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectStateAndTimers.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import org.apache.beam.runners.direct.Clock;
+import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.local.StructuralKey;
+
+/**
+ * State and Timer access for the {@link ReferenceRunner}.
+ *
+ * <p>This provides per-key, per-stage access to {@link 
CopyOnAccessInMemoryStateInternals} and
+ * {@link DirectTimerInternals} for transforms that require access to state or 
timers.
+ *
+ * <p>This implementation is not thread safe. A new {@link 
DirectStateAndTimers} must be created for
+ * each thread that requires it.
+ */
+class DirectStateAndTimers<K> implements StepStateAndTimers<K> {
+  private final StructuralKey<K> key;
+  private final CopyOnAccessInMemoryStateInternals existingState;
+
+  private final Clock clock;
+  private final TransformWatermarks watermarks;
+
+  private CopyOnAccessInMemoryStateInternals<K> stateInternals;
+  private DirectTimerInternals timerInternals;
+
+  DirectStateAndTimers(
+      StructuralKey<K> key,
+      CopyOnAccessInMemoryStateInternals existingState,
+      Clock clock,
+      TransformWatermarks watermarks) {
+    this.key = key;
+    this.existingState = existingState;
+    this.clock = clock;
+    this.watermarks = watermarks;
+  }
+
+  @Override
+  public CopyOnAccessInMemoryStateInternals<K> stateInternals() {
+    if (stateInternals == null) {
+      stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, 
existingState);
+    }
+    return stateInternals;
+  }
+
+  @Override
+  public DirectTimerInternals timerInternals() {
+    if (timerInternals == null) {
+      timerInternals = DirectTimerInternals.create(clock, watermarks, 
TimerUpdate.builder(key));
+    }
+    return timerInternals;
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
index e387272e9a4..b4ed030609e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContext.java
@@ -57,9 +57,9 @@
  * <p>{@link EvaluationContext} contains shared state for an execution of the 
{@code DirectRunner}
  * that can be used while evaluating a {@link PTransform}. This consists of 
views into underlying
  * state and watermark implementations, access to read and write {@link 
PCollectionView
- * PCollectionViews}, and managing the {@link DirectExecutionContext 
ExecutionContexts}. This
- * includes executing callbacks asynchronously when state changes to the 
appropriate point (e.g.
- * when a {@link PCollectionView} is requested and known to be empty).
+ * PCollectionViews}, and managing the {@link DirectStateAndTimers 
ExecutionContexts}. This includes
+ * executing callbacks asynchronously when state changes to the appropriate 
point (e.g. when a
+ * {@link PCollectionView} is requested and known to be empty).
  *
  * <p>{@link EvaluationContext} also handles results by committing finalizing 
bundles based on the
  * current global state and updating the global state appropriately. This 
includes updating the
@@ -271,47 +271,22 @@ public void scheduleAfterOutputWouldBeProduced(
       WindowingStrategy<?, ?> windowingStrategy,
       Runnable runnable) {
     PTransformNode producing = graph.getProducer(value);
-    callbackExecutor.callOnGuaranteedFiring(producing, window, 
windowingStrategy, runnable);
-
-    fireAvailableCallbacks(producing);
-  }
-
-  /**
-   * Schedule a callback to be executed after the given window is expired.
-   *
-   * <p>For example, upstream state associated with the window may be cleared.
-   */
-  public void scheduleAfterWindowExpiration(
-      PTransformNode producing,
-      BoundedWindow window,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Runnable runnable) {
     callbackExecutor.callOnWindowExpiration(producing, window, 
windowingStrategy, runnable);
 
     fireAvailableCallbacks(producing);
   }
 
-  /**
-   * Get a {@link DirectExecutionContext} for the provided {@link 
PTransformNode} and key.
-   */
-  public DirectExecutionContext getExecutionContext(
-      PTransformNode application, StructuralKey<?> key) {
+  /** Get a {@link DirectStateAndTimers} for the provided {@link 
PTransformNode} and key. */
+  public <K> StepStateAndTimers<K> getStateAndTimers(
+      PTransformNode application, StructuralKey<K> key) {
     StepAndKey stepAndKey = StepAndKey.of(application, key);
-    return new DirectExecutionContext(
-        clock,
+    return new DirectStateAndTimers<>(
         key,
         applicationStateInternals.get(stepAndKey),
+        clock,
         watermarkManager.getWatermarks(application));
   }
 
-
-  /**
-   * Get the Step Name for the provided application.
-   */
-  String getStepName(PTransformNode application) {
-    throw new UnsupportedOperationException("getStepName Unsupported");
-  }
-
   /** Returns all of the steps in this {@link Pipeline}. */
   Collection<PTransformNode> getSteps() {
     return graph.getExecutables();
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContextStepStateAndTimersProvider.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContextStepStateAndTimersProvider.java
new file mode 100644
index 00000000000..f6ef49c2c34
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/EvaluationContextStepStateAndTimersProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct.portable;
+
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.local.StructuralKey;
+
+/** A {@link StepStateAndTimers.Provider} that uses an {@link 
EvaluationContext}. */
+class EvaluationContextStepStateAndTimersProvider implements 
StepStateAndTimers.Provider {
+  public static StepStateAndTimers.Provider forContext(EvaluationContext 
context) {
+    return new EvaluationContextStepStateAndTimersProvider(context);
+  }
+
+  private final EvaluationContext context;
+
+  private EvaluationContextStepStateAndTimersProvider(EvaluationContext 
context) {
+    this.context = context;
+  }
+
+  @Override
+  public <K> StepStateAndTimers<K> forStepAndKey(PTransformNode transform, 
StructuralKey<K> key) {
+    return context.getStateAndTimers(transform, key);
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
index 0e5eeec8ae3..4b1d91714b2 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ExecutorServiceParallelExecutor.java
@@ -62,8 +62,10 @@
   private final int targetParallelism;
   private final ExecutorService executorService;
 
+  private final RootProviderRegistry rootRegistry;
   private final TransformEvaluatorRegistry registry;
 
+  private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
   private final EvaluationContext evaluationContext;
 
   private final TransformExecutorFactory executorFactory;
@@ -75,12 +77,21 @@
   private AtomicReference<State> pipelineState = new 
AtomicReference<>(State.RUNNING);
 
   public static ExecutorServiceParallelExecutor create(
-      int targetParallelism, TransformEvaluatorRegistry registry, 
EvaluationContext context) {
-    return new ExecutorServiceParallelExecutor(targetParallelism, registry, 
context);
+      int targetParallelism,
+      RootProviderRegistry rootRegistry,
+      TransformEvaluatorRegistry transformRegistry,
+      ExecutableGraph<PTransformNode, PCollectionNode> graph,
+      EvaluationContext context) {
+    return new ExecutorServiceParallelExecutor(
+        targetParallelism, rootRegistry, transformRegistry, graph, context);
   }
 
   private ExecutorServiceParallelExecutor(
-      int targetParallelism, TransformEvaluatorRegistry registry, 
EvaluationContext context) {
+      int targetParallelism,
+      RootProviderRegistry rootRegistry,
+      TransformEvaluatorRegistry registry,
+      ExecutableGraph<PTransformNode, PCollectionNode> graph,
+      EvaluationContext context) {
     this.targetParallelism = targetParallelism;
     // Don't use Daemon threads for workers. The Pipeline should continue to 
execute even if there
     // are no other active threads (for example, because waitUntilFinish was 
not called)
@@ -91,7 +102,9 @@ private ExecutorServiceParallelExecutor(
                 .setThreadFactory(MoreExecutors.platformThreadFactory())
                 .setNameFormat("direct-runner-worker")
                 .build());
+    this.rootRegistry = rootRegistry;
     this.registry = registry;
+    this.graph = graph;
     this.evaluationContext = context;
 
     // Weak Values allows TransformExecutorServices that are no longer in use 
to be reclaimed.
@@ -129,9 +142,7 @@ public TransformExecutorService load(StepAndKey stepAndKey) 
throws Exception {
   }
 
   @Override
-  public void start(
-      ExecutableGraph<PTransformNode, PCollectionNode> graph,
-      RootProviderRegistry rootProviderRegistry) {
+  public void start() {
     int numTargetSplits = Math.max(3, targetParallelism);
     ImmutableMap.Builder<PTransformNode, 
ConcurrentLinkedQueue<CommittedBundle<?>>>
         pendingRootBundles = ImmutableMap.builder();
@@ -139,7 +150,7 @@ public void start(
       ConcurrentLinkedQueue<CommittedBundle<?>> pending = new 
ConcurrentLinkedQueue<>();
       try {
         Collection<CommittedBundle<?>> initialInputs =
-            rootProviderRegistry.getInitialInputs(root, numTargetSplits);
+            rootRegistry.getInitialInputs(root, numTargetSplits);
         pending.addAll(initialInputs);
       } catch (Exception e) {
         throw UserCodeException.wrap(e);
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
index 2e67c8a2e02..b8737b34d06 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/GroupAlsoByWindowEvaluatorFactory.java
@@ -68,13 +68,13 @@
   private final BundleFactory bundleFactory;
   private final ExecutableGraph<PTransformNode, PCollectionNode> graph;
   private final Components components;
-  private final StateAndTimerProvider stp;
+  private final StepStateAndTimers.Provider stp;
 
   GroupAlsoByWindowEvaluatorFactory(
       ExecutableGraph<PTransformNode, PCollectionNode> graph,
       Components components,
       BundleFactory bundleFactory,
-      StateAndTimerProvider stp) {
+      StepStateAndTimers.Provider stp) {
     this.bundleFactory = bundleFactory;
     this.graph = graph;
     this.components = components;
@@ -98,7 +98,7 @@ public void cleanup() {}
     @SuppressWarnings("unchecked")
     StructuralKey<K> key = (StructuralKey<K>) inputBundle.getKey();
     return new GroupAlsoByWindowEvaluator<>(
-        bundleFactory, key, application, graph, components, stp);
+        bundleFactory, key, application, graph, components, 
stp.forStepAndKey(application, key));
   }
 
   /**
@@ -132,14 +132,14 @@ private GroupAlsoByWindowEvaluator(
         PTransformNode application,
         ExecutableGraph<PTransformNode, PCollectionNode> graph,
         Components components,
-        StateAndTimerProvider stp) {
+        StepStateAndTimers<K> stp) {
       this.bundleFactory = bundleFactory;
       this.application = application;
       this.outputCollection = getOnlyElement(graph.getProduced(application));
       this.key = key;
 
-      this.stateInternals = stp.stateInternals(application, key);
-      this.timerInternals = stp.timerInternals(application, key);
+      this.stateInternals = stp.stateInternals();
+      this.timerInternals = stp.timerInternals();
 
       PCollectionNode inputCollection = 
getOnlyElement(graph.getPerElementInputs(application));
       try {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
index c921fada195..98e3cd61400 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/PipelineExecutor.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.direct.portable;
 
-import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.runners.direct.ExecutableGraph;
 import org.apache.beam.sdk.Pipeline;
@@ -34,9 +33,7 @@
    * Starts this executor on the provided graph. The {@link 
RootProviderRegistry} will be used to
    * create initial inputs for the provide {@link ExecutableGraph graph}.
    */
-  void start(
-      ExecutableGraph<PTransformNode, PCollectionNode> graph,
-      RootProviderRegistry rootProviderRegistry);
+  void start();
 
   /**
    * Blocks until the job being executed enters a terminal state. A job is 
completed after all root
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
index a9cf1471235..143a0ba82d1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/QuiescenceDriver.java
@@ -20,6 +20,7 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -157,8 +158,8 @@ private void fireTimers() {
         Collection<TimerData> delivery = transformTimers.getTimers();
         KeyedWorkItem<?, Object> work =
             KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), 
delivery);
-        // TODO: Extract from graph
-        PCollectionNode inputPCollection = null;
+        PCollectionNode inputPCollection =
+            
Iterables.getOnlyElement(graph.getPerElementInputs(transformTimers.getExecutable()));
         @SuppressWarnings({"unchecked", "rawtypes"})
         CommittedBundle<?> bundle =
             evaluationContext
@@ -170,7 +171,6 @@ private void fireTimers() {
             bundle, transformTimers.getExecutable(), new 
TimerIterableCompletionCallback(delivery));
         state.set(ExecutorState.ACTIVE);
       }
-      throw new UnsupportedOperationException();
     } catch (Exception e) {
       LOG.error("Internal Error while delivering timers", e);
       pipelineMessageReceiver.failed(e);
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
new file mode 100644
index 00000000000..18b2a7d85b4
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/ReferenceRunner.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ComponentsOrBuilder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.ModelCoders.KvCoderComponents;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.ProtoOverrides;
+import 
org.apache.beam.runners.core.construction.graph.ProtoOverrides.TransformReplacement;
+import org.apache.beam.runners.direct.ExecutableGraph;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** The "ReferenceRunner" engine implementation. */
+class ReferenceRunner {
+  private final RunnerApi.Pipeline pipeline;
+  private final Struct options;
+
+  private ReferenceRunner(RunnerApi.Pipeline p, Struct options) throws 
IOException {
+    this.pipeline = executable(p);
+    this.options = options;
+  }
+
+  static ReferenceRunner forPipeline(RunnerApi.Pipeline p, Struct options) 
throws IOException {
+    return new ReferenceRunner(p, options);
+  }
+
+  private RunnerApi.Pipeline executable(RunnerApi.Pipeline original) {
+    RunnerApi.Pipeline withGbks =
+        ProtoOverrides.updateTransform(
+            PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+            original,
+            new PortableGroupByKeyReplacer());
+    return GreedyPipelineFuser.fuse(withGbks).toPipeline();
+  }
+
+  public void execute() throws Exception {
+    ExecutableGraph<PTransformNode, PCollectionNode> graph = 
PortableGraph.forPipeline(pipeline);
+    BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+    EvaluationContext ctxt =
+        EvaluationContext.create(Instant::new, bundleFactory, graph, 
Collections.emptySet());
+    RootProviderRegistry rootRegistry = 
RootProviderRegistry.impulseRegistry(bundleFactory);
+    int targetParallelism = 
Math.max(Runtime.getRuntime().availableProcessors(), 3);
+    ServerFactory serverFactory = createServerFactory();
+    ControlClientPool controlClientPool = MapControlClientPool.create();
+    ExecutorService dataExecutor = Executors.newCachedThreadPool();
+    try (GrpcFnServer<GrpcLoggingService> logging =
+            GrpcFnServer.allocatePortAndCreateFor(
+                GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
+        GrpcFnServer<FnApiControlClientPoolService> control =
+            GrpcFnServer.allocatePortAndCreateFor(
+                FnApiControlClientPoolService.offeringClientsToPool(
+                    controlClientPool.getSink(),
+                    GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
+                serverFactory);
+        GrpcFnServer<GrpcDataService> data =
+            GrpcFnServer.allocatePortAndCreateFor(
+                GrpcDataService.create(dataExecutor), serverFactory);
+        GrpcFnServer<GrpcStateService> state =
+            GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), 
serverFactory)) {
+
+      EnvironmentFactory environmentFactory =
+          InProcessEnvironmentFactory.create(
+              PipelineOptionsTranslation.fromProto(options),
+              logging,
+              control,
+              controlClientPool.getSource());
+      JobBundleFactory jobBundleFactory =
+          DirectJobBundleFactory.create(environmentFactory, data, state);
+
+      TransformEvaluatorRegistry transformRegistry =
+          TransformEvaluatorRegistry.portableRegistry(
+              graph,
+              pipeline.getComponents(),
+              bundleFactory,
+              jobBundleFactory,
+              EvaluationContextStepStateAndTimersProvider.forContext(ctxt));
+      ExecutorServiceParallelExecutor executor =
+          ExecutorServiceParallelExecutor.create(
+              targetParallelism, rootRegistry, transformRegistry, graph, ctxt);
+      executor.start();
+      executor.waitUntilFinish(Duration.ZERO);
+    } finally {
+      dataExecutor.shutdown();
+    }
+  }
+
+  private ServerFactory createServerFactory() {
+    return InProcessServerFactory.create();
+  }
+
+  @VisibleForTesting
+  static class PortableGroupByKeyReplacer implements TransformReplacement {
+    @Override
+    public MessageWithComponents getReplacement(String gbkId, 
ComponentsOrBuilder components) {
+      PTransform gbk = components.getTransformsOrThrow(gbkId);
+      checkArgument(
+          
PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN.equals(gbk.getSpec().getUrn()),
+          "URN must be %s, got %s",
+          PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+          gbk.getSpec().getUrn());
+      String inputId = getOnlyElement(gbk.getInputsMap().values());
+      PCollection input = components.getPcollectionsOrThrow(inputId);
+
+      Coder inputCoder = components.getCodersOrThrow(input.getCoderId());
+      KvCoderComponents kvComponents = 
ModelCoders.getKvCoderComponents(inputCoder);
+      String windowCoderId =
+          components
+              .getWindowingStrategiesOrThrow(input.getWindowingStrategyId())
+              .getWindowCoderId();
+      // This coder isn't actually required for the pipeline to function 
properly - the KWIs can be
+      // passed around as pure java objects with no coding of the values, but 
it approximates a full
+      // pipeline.
+      Coder intermediateCoder =
+          Coder.newBuilder()
+              .setSpec(
+                  SdkFunctionSpec.newBuilder()
+                      
.setSpec(FunctionSpec.newBuilder().setUrn("beam:direct:keyedworkitem:v1")))
+              .addAllComponentCoderIds(
+                  ImmutableList.of(
+                      kvComponents.keyCoderId(), kvComponents.valueCoderId(), 
windowCoderId))
+              .build();
+      String intermediateCoderId =
+          SyntheticComponents.uniqueId(
+              String.format(
+                  "keyed_work_item(%s:%s)", kvComponents.keyCoderId(), 
kvComponents.valueCoderId()),
+              components::containsCoders);
+
+      String partitionedId =
+          SyntheticComponents.uniqueId(
+              String.format("%s.%s", inputId, "partitioned"), 
components::containsPcollections);
+      // The partitioned PCollection has the same WindowingStrategy as the 
input, as no merging will
+      // have been performed, so elements remain in their original windows
+      PCollection partitioned =
+          
input.toBuilder().setUniqueName(partitionedId).setCoderId(intermediateCoderId).build();
+      String gbkoId =
+          SyntheticComponents.uniqueId(
+              String.format("%s/GBKO", gbkId), components::containsTransforms);
+      PTransform gbko =
+          PTransform.newBuilder()
+              .putAllInputs(gbk.getInputsMap())
+              
.setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GBKO_URN))
+              .putOutputs("output", partitionedId)
+              .build();
+      String gabwId =
+          SyntheticComponents.uniqueId(
+              String.format("%s/GABW", gbkId), components::containsTransforms);
+      PTransform gabw =
+          PTransform.newBuilder()
+              .putInputs("input", partitionedId)
+              
.setSpec(FunctionSpec.newBuilder().setUrn(DirectGroupByKey.DIRECT_GABW_URN))
+              .putAllOutputs(gbk.getOutputsMap())
+              .build();
+      Components newComponents =
+          Components.newBuilder()
+              .putCoders(intermediateCoderId, intermediateCoder)
+              .putPcollections(partitionedId, partitioned)
+              .putTransforms(gbkoId, gbko)
+              .putTransforms(gabwId, gabw)
+              .build();
+      return MessageWithComponents.newBuilder()
+          
.setPtransform(gbk.toBuilder().addSubtransforms(gbkoId).addSubtransforms(gabwId).build())
+          .setComponents(newComponents)
+          .build();
+    }
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StateAndTimerProvider.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepStateAndTimers.java
similarity index 82%
rename from 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StateAndTimerProvider.java
rename to 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepStateAndTimers.java
index d7fa6bd6c4a..9126eb470a4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StateAndTimerProvider.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/StepStateAndTimers.java
@@ -24,9 +24,12 @@
 import org.apache.beam.runners.local.StructuralKey;
 
 /** A provider of {@link StateInternals} and {@link TimerInternals}. */
-interface StateAndTimerProvider {
-  <K> CopyOnAccessInMemoryStateInternals<K> stateInternals(
-      PTransformNode transform, StructuralKey<K> key);
+interface StepStateAndTimers<K> {
+  interface Provider {
+    <K> StepStateAndTimers<K> forStepAndKey(PTransformNode transform, 
StructuralKey<K> key);
+  }
 
-  DirectTimerInternals timerInternals(PTransformNode transform, 
StructuralKey<?> key);
+  CopyOnAccessInMemoryStateInternals<K> stateInternals();
+
+  DirectTimerInternals timerInternals();
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
index 08123d3b870..0a919e406f0 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/TransformEvaluatorRegistry.java
@@ -48,7 +48,7 @@ static TransformEvaluatorRegistry portableRegistry(
       Components components,
       BundleFactory bundleFactory,
       JobBundleFactory jobBundleFactory,
-      StateAndTimerProvider stateAndTimerProvider) {
+      StepStateAndTimers.Provider stepStateAndTimers) {
     return new TransformEvaluatorRegistry(
         ImmutableMap.<String, TransformEvaluatorFactory>builder()
             .put(
@@ -63,7 +63,7 @@ static TransformEvaluatorRegistry portableRegistry(
             .put(
                 DirectGroupByKey.DIRECT_GABW_URN,
                 new GroupAlsoByWindowEvaluatorFactory(
-                    graph, components, bundleFactory, stateAndTimerProvider))
+                    graph, components, bundleFactory, stepStateAndTimers))
             .put(
                 ExecutableStage.URN,
                 new RemoteStageEvaluatorFactory(bundleFactory, 
jobBundleFactory))
@@ -90,6 +90,7 @@ private TransformEvaluatorRegistry(
 
     TransformEvaluatorFactory factory =
         checkNotNull(factories.get(urn), "No evaluator for PTransform \"%s\"", 
urn);
+    LOG.warn("Evaluator Factory {} for PTransform {}", factory, application);
     return factory.forApplication(application, inputBundle);
   }
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
index 9809c481dba..6ce38b53f1d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/EvaluationContextTest.java
@@ -39,7 +39,6 @@
 import org.apache.beam.runners.direct.ExecutableGraph;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
-import 
org.apache.beam.runners.direct.portable.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -93,13 +92,12 @@ public void setup() {
 
   @Test
   public void getExecutionContextSameStepSameKeyState() {
-    DirectExecutionContext fooContext =
-        context.getExecutionContext(createdProducer, StructuralKey.of("foo", 
StringUtf8Coder.of()));
+    StepStateAndTimers<String> fooContext =
+        context.getStateAndTimers(createdProducer, StructuralKey.of("foo", 
StringUtf8Coder.of()));
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
 
-    DirectStepContext stepContext = fooContext.getStepContext("s1");
-    stepContext.stateInternals().state(StateNamespaces.global(), 
intBag).add(1);
+    fooContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
     context.handleResult(
         ImmutableListBundleFactory.create()
@@ -107,68 +105,56 @@ public void getExecutionContextSameStepSameKeyState() {
             .commit(Instant.now()),
         ImmutableList.of(),
         StepTransformResult.withoutHold(createdProducer)
-            .withState(stepContext.commitState())
+            .withState(fooContext.stateInternals().commit())
             .build());
 
-    DirectExecutionContext secondFooContext =
-        context.getExecutionContext(createdProducer, StructuralKey.of("foo", 
StringUtf8Coder.of()));
+    StepStateAndTimers secondFooContext =
+        context.getStateAndTimers(createdProducer, StructuralKey.of("foo", 
StringUtf8Coder.of()));
     assertThat(
-        secondFooContext
-            .getStepContext("s1")
-            .stateInternals()
-            .state(StateNamespaces.global(), intBag)
-            .read(),
+        secondFooContext.stateInternals().state(StateNamespaces.global(), 
intBag).read(),
         contains(1));
   }
 
   @Test
   public void getExecutionContextDifferentKeysIndependentState() {
-    DirectExecutionContext fooContext =
-        context.getExecutionContext(createdProducer, StructuralKey.of("foo", 
StringUtf8Coder.of()));
+    StepStateAndTimers fooContext =
+        context.getStateAndTimers(createdProducer, StructuralKey.of("foo", 
StringUtf8Coder.of()));
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
 
-    
fooContext.getStepContext("s1").stateInternals().state(StateNamespaces.global(),
 intBag).add(1);
+    fooContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
-    DirectExecutionContext barContext =
-        context.getExecutionContext(createdProducer, StructuralKey.of("bar", 
StringUtf8Coder.of()));
+    StepStateAndTimers barContext =
+        context.getStateAndTimers(createdProducer, StructuralKey.of("bar", 
StringUtf8Coder.of()));
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
-        barContext
-            .getStepContext("s1")
-            .stateInternals()
-            .state(StateNamespaces.global(), intBag)
-            .read(),
+        barContext.stateInternals().state(StateNamespaces.global(), 
intBag).read(),
         emptyIterable());
   }
 
   @Test
   public void getExecutionContextDifferentStepsIndependentState() {
     StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of());
-    DirectExecutionContext fooContext = 
context.getExecutionContext(createdProducer, myKey);
+    StepStateAndTimers fooContext = context.getStateAndTimers(createdProducer, 
myKey);
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
 
-    
fooContext.getStepContext("s1").stateInternals().state(StateNamespaces.global(),
 intBag).add(1);
+    fooContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
-    DirectExecutionContext barContext = 
context.getExecutionContext(downstreamProducer, myKey);
+    StepStateAndTimers barContext = 
context.getStateAndTimers(downstreamProducer, myKey);
     assertThat(
-        barContext
-            .getStepContext("s1")
-            .stateInternals()
-            .state(StateNamespaces.global(), intBag)
-            .read(),
+        barContext.stateInternals().state(StateNamespaces.global(), 
intBag).read(),
         emptyIterable());
   }
 
   @Test
   public void handleResultStoresState() {
     StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), 
ByteArrayCoder.of());
-    DirectExecutionContext fooContext = 
context.getExecutionContext(downstreamProducer, myKey);
+    StepStateAndTimers fooContext = 
context.getStateAndTimers(downstreamProducer, myKey);
 
     StateTag<BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
 
-    CopyOnAccessInMemoryStateInternals<?> state = 
fooContext.getStepContext("s1").stateInternals();
+    CopyOnAccessInMemoryStateInternals<?> state = fooContext.stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
     bag.add(2);
@@ -182,11 +168,9 @@ public void handleResultStoresState() {
         ImmutableList.of(),
         stateResult);
 
-    DirectExecutionContext afterResultContext =
-        context.getExecutionContext(downstreamProducer, myKey);
+    StepStateAndTimers afterResultContext = 
context.getStateAndTimers(downstreamProducer, myKey);
 
-    CopyOnAccessInMemoryStateInternals<?> afterResultState =
-        afterResultContext.getStepContext("s1").stateInternals();
+    CopyOnAccessInMemoryStateInternals<?> afterResultState = 
afterResultContext.stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), 
intBag).read(), contains(1, 2, 4));
   }
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
new file mode 100644
index 00000000000..2a249ddc269
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for the {@link ReferenceRunner}. */
+@RunWith(JUnit4.class)
+public class ReferenceRunnerTest implements Serializable {
+  @Test
+  public void pipelineExecution() throws Exception {
+    Pipeline p = Pipeline.create();
+    TupleTag<KV<String, Integer>> food = new TupleTag<>();
+    TupleTag<Integer> originals = new TupleTag<Integer>() {};
+    PCollectionTuple parDoOutputs =
+        p.apply(Create.of(1, 2, 3))
+            .apply(
+                ParDo.of(
+                        new DoFn<Integer, KV<String, Integer>>() {
+                          @ProcessElement
+                          public void process(ProcessContext ctxt) {
+                            for (int i = 0; i < ctxt.element(); i++) {
+                              ctxt.outputWithTimestamp(
+                                  KV.of("foo", i), new 
Instant(0).plus(Duration.standardHours(i)));
+                            }
+                            ctxt.output(originals, ctxt.element());
+                          }
+                        })
+                    .withOutputTags(food, TupleTagList.of(originals)));
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5L));
+    PCollection<KV<String, Set<Integer>>> grouped =
+        parDoOutputs
+            .get(food)
+            .apply(Window.into(windowFn))
+            .apply(GroupByKey.create())
+            .apply(
+                ParDo.of(
+                    new DoFn<KV<String, Iterable<Integer>>, KV<String, 
Set<Integer>>>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctxt) {
+                        ctxt.output(
+                            KV.of(
+                                ctxt.element().getKey(),
+                                
ImmutableSet.copyOf(ctxt.element().getValue())));
+                      }
+                    }));
+
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+
+    ReferenceRunner runner =
+        ReferenceRunner.forPipeline(
+            PipelineTranslation.toProto(p),
+            
PipelineOptionsTranslation.toProto(PipelineOptionsFactory.create()));
+    runner.execute();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 108184)
            Time Spent: 10m
    Remaining Estimate: 0h

> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> To make the portability effort tractable, we should implement a Universal 
> Local Runner (ULR) in Java that runs in a single server process plus docker 
> containers for the SDK harness containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature 
> should be implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the 
> portability framework. It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime 
> environment. For example, a DoFn that shells out has a dependency that may be 
> satisfied on the user's desktop (and thus works fine on the direct runner), 
> but perhaps not by the container harness image. The ULR allows for an easy 
> way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to