http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
new file mode 100644
index 0000000..f374f99
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} 
used by the
+ * {@link InProcessPipelineRunner}.
+ */
+public class InProcessBundleOutputManager implements OutputManager {
+  private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+
+  public static InProcessBundleOutputManager create(
+      Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+    return new InProcessBundleOutputManager(outputBundles);
+  }
+
+  public InProcessBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> 
bundles) {
+    this.bundles = bundles;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+    @SuppressWarnings("rawtypes")
+    UncommittedBundle bundle = bundles.get(tag);
+    bundle.add(output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
new file mode 100644
index 0000000..d9a7ff0
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import 
org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
+import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * The evaluation context for a specific pipeline being executed by the
+ * {@link InProcessPipelineRunner}. Contains state shared within the execution 
across all
+ * transforms.
+ *
+ * <p>{@link InProcessEvaluationContext} contains shared state for an 
execution of the
+ * {@link InProcessPipelineRunner} that can be used while evaluating a {@link 
PTransform}. This
+ * consists of views into underlying state and watermark implementations, 
access to read and write
+ * {@link PCollectionView PCollectionViews}, and constructing {@link 
CounterSet CounterSets} and
+ * {@link ExecutionContext ExecutionContexts}. This includes executing 
callbacks asynchronously when
+ * state changes to the appropriate point (e.g. when a {@link PCollectionView} 
is requested and
+ * known to be empty).
+ *
+ * <p>{@link InProcessEvaluationContext} also handles results by committing 
finalizing bundles based
+ * on the current global state and updating the global state appropriately. 
This includes updating
+ * the per-{@link StepAndKey} state, updating global watermarks, and executing 
any callbacks that
+ * can be executed.
+ */
+class InProcessEvaluationContext {
+  /** The step name for each {@link AppliedPTransform} in the {@link 
Pipeline}. */
+  private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+
+  /** The options that were used to create this {@link Pipeline}. */
+  private final InProcessPipelineOptions options;
+
+  private final BundleFactory bundleFactory;
+  /** The current processing time and event time watermarks and timers. */
+  private final InMemoryWatermarkManager watermarkManager;
+
+  /** Executes callbacks based on the progression of the watermark. */
+  private final WatermarkCallbackExecutor callbackExecutor;
+
+  /** The stateInternals of the world, by applied PTransform and key. */
+  private final ConcurrentMap<StepAndKey, 
CopyOnAccessInMemoryStateInternals<?>>
+      applicationStateInternals;
+
+  private final InProcessSideInputContainer sideInputContainer;
+
+  private final CounterSet mergedCounters;
+
+  public static InProcessEvaluationContext create(
+      InProcessPipelineOptions options,
+      BundleFactory bundleFactory,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+      Collection<PCollectionView<?>> views) {
+    return new InProcessEvaluationContext(
+        options, bundleFactory, rootTransforms, valueToConsumers, stepNames, 
views);
+  }
+
+  private InProcessEvaluationContext(
+      InProcessPipelineOptions options,
+      BundleFactory bundleFactory,
+      Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+      Collection<PCollectionView<?>> views) {
+    this.options = checkNotNull(options);
+    this.bundleFactory = checkNotNull(bundleFactory);
+    checkNotNull(rootTransforms);
+    checkNotNull(valueToConsumers);
+    checkNotNull(stepNames);
+    checkNotNull(views);
+    this.stepNames = stepNames;
+
+    this.watermarkManager =
+        InMemoryWatermarkManager.create(
+            NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+    this.sideInputContainer = InProcessSideInputContainer.create(this, views);
+
+    this.applicationStateInternals = new ConcurrentHashMap<>();
+    this.mergedCounters = new CounterSet();
+
+    this.callbackExecutor = WatermarkCallbackExecutor.create();
+  }
+
+  /**
+   * Handle the provided {@link InProcessTransformResult}, produced after 
evaluating the provided
+   * {@link CommittedBundle} (potentially null, if the result of a root {@link 
PTransform}).
+   *
+   * <p>The result is the output of running the transform contained in the
+   * {@link InProcessTransformResult} on the contents of the provided bundle.
+   *
+   * @param completedBundle the bundle that was processed to produce the 
result. Potentially
+   *                        {@code null} if the transform that produced the 
result is a root
+   *                        transform
+   * @param completedTimers the timers that were delivered to produce the 
{@code completedBundle},
+   *                        or an empty iterable if no timers were delivered
+   * @param result the result of evaluating the input bundle
+   * @return the committed bundles contained within the handled {@code result}
+   */
+  public synchronized CommittedResult handleResult(
+      @Nullable CommittedBundle<?> completedBundle,
+      Iterable<TimerData> completedTimers,
+      InProcessTransformResult result) {
+    Iterable<? extends CommittedBundle<?>> committedBundles =
+        commitBundles(result.getOutputBundles());
+    // Update watermarks and timers
+    watermarkManager.updateWatermarks(
+        completedBundle,
+        result.getTransform(),
+        result.getTimerUpdate().withCompletedTimers(completedTimers),
+        committedBundles,
+        result.getWatermarkHold());
+    fireAllAvailableCallbacks();
+    // Update counters
+    if (result.getCounters() != null) {
+      mergedCounters.merge(result.getCounters());
+    }
+    // Update state internals
+    CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+    if (theirState != null) {
+      CopyOnAccessInMemoryStateInternals<?> committedState = 
theirState.commit();
+      StepAndKey stepAndKey =
+          StepAndKey.of(
+              result.getTransform(), completedBundle == null ? null : 
completedBundle.getKey());
+      if (!committedState.isEmpty()) {
+        applicationStateInternals.put(stepAndKey, committedState);
+      } else {
+        applicationStateInternals.remove(stepAndKey);
+      }
+    }
+    return CommittedResult.create(result, committedBundles);
+  }
+
+  private Iterable<? extends CommittedBundle<?>> commitBundles(
+      Iterable<? extends UncommittedBundle<?>> bundles) {
+    ImmutableList.Builder<CommittedBundle<?>> completed = 
ImmutableList.builder();
+    for (UncommittedBundle<?> inProgress : bundles) {
+      AppliedPTransform<?, ?, ?> producing =
+          inProgress.getPCollection().getProducingTransformInternal();
+      TransformWatermarks watermarks = 
watermarkManager.getWatermarks(producing);
+      CommittedBundle<?> committed =
+          inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
+      // Empty bundles don't impact watermarks and shouldn't trigger 
downstream execution, so
+      // filter them out
+      if (!Iterables.isEmpty(committed.getElements())) {
+        completed.add(committed);
+      }
+    }
+    return completed.build();
+  }
+
+  private void fireAllAvailableCallbacks() {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      fireAvailableCallbacks(transform);
+    }
+  }
+
+  private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> 
producingTransform) {
+    TransformWatermarks watermarks = 
watermarkManager.getWatermarks(producingTransform);
+    callbackExecutor.fireForWatermark(producingTransform, 
watermarks.getOutputWatermark());
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} for use by a source.
+   */
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+    return bundleFactory.createRootBundle(output);
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} whose elements belong to the specified 
{@link
+   * PCollection}.
+   */
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, 
PCollection<T> output) {
+    return bundleFactory.createBundle(input, output);
+  }
+
+  /**
+   * Create a {@link UncommittedBundle} with the specified keys at the 
specified step. For use by
+   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
+   */
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output) {
+    return bundleFactory.createKeyedBundle(input, key, output);
+  }
+
+  /**
+   * Create a {@link PCollectionViewWriter}, whose elements will be used in 
the provided
+   * {@link PCollectionView}.
+   */
+  public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> 
createPCollectionViewWriter(
+      PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) 
{
+    return new PCollectionViewWriter<ElemT, ViewT>() {
+      @Override
+      public void add(Iterable<WindowedValue<ElemT>> values) {
+        sideInputContainer.write(output, values);
+      }
+    };
+  }
+
+  /**
+   * Schedule a callback to be executed after output would be produced for the 
given window
+   * if there had been input.
+   *
+   * <p>Output would be produced when the watermark for a {@link PValue} 
passes the point at
+   * which the trigger for the specified window (with the specified windowing 
strategy) must have
+   * fired from the perspective of that {@link PValue}, as specified by the 
value of
+   * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the 
trigger of the
+   * {@link WindowingStrategy}. When the callback has fired, either values 
will have been produced
+   * for a key in that window, the window is empty, or all elements in the 
window are late. The
+   * callback will be executed regardless of whether values have been produced.
+   */
+  public void scheduleAfterOutputWouldBeProduced(
+      PValue value,
+      BoundedWindow window,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Runnable runnable) {
+    AppliedPTransform<?, ?, ?> producing = getProducing(value);
+    callbackExecutor.callOnGuaranteedFiring(producing, window, 
windowingStrategy, runnable);
+
+    fireAvailableCallbacks(lookupProducing(value));
+  }
+
+  private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
+    if (value.getProducingTransformInternal() != null) {
+      return value.getProducingTransformInternal();
+    }
+    return lookupProducing(value);
+  }
+
+  private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      if (transform.getOutput().equals(value) || 
transform.getOutput().expand().contains(value)) {
+        return transform;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the options used by this {@link Pipeline}.
+   */
+  public InProcessPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  /**
+   * Get an {@link ExecutionContext} for the provided {@link 
AppliedPTransform} and key.
+   */
+  public InProcessExecutionContext getExecutionContext(
+      AppliedPTransform<?, ?, ?> application, Object key) {
+    StepAndKey stepAndKey = StepAndKey.of(application, key);
+    return new InProcessExecutionContext(
+        options.getClock(),
+        key,
+        (CopyOnAccessInMemoryStateInternals<Object>) 
applicationStateInternals.get(stepAndKey),
+        watermarkManager.getWatermarks(application));
+  }
+
+  /**
+   * Get all of the steps used in this {@link Pipeline}.
+   */
+  public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
+    return stepNames.keySet();
+  }
+
+  /**
+   * Get the Step Name for the provided application.
+   */
+  public String getStepName(AppliedPTransform<?, ?, ?> application) {
+    return stepNames.get(application);
+  }
+
+  /**
+   * Returns a {@link ReadyCheckingSideInputReader} capable of reading the 
provided
+   * {@link PCollectionView PCollectionViews}.
+   *
+   * @param sideInputs the {@link PCollectionView PCollectionViews} the result 
should be able to
+   * read
+   * @return a {@link SideInputReader} that can read all of the provided 
{@link PCollectionView
+   * PCollectionViews}
+   */
+  public ReadyCheckingSideInputReader createSideInputReader(
+      final List<PCollectionView<?>> sideInputs) {
+    return sideInputContainer.createReaderForViews(sideInputs);
+  }
+
+  /**
+   * A {@link SideInputReader} that allows callers to check to see if a {@link 
PCollectionView} has
+   * had its contents set in a window.
+   */
+  static interface ReadyCheckingSideInputReader extends SideInputReader {
+    /**
+     * Returns true if the {@link PCollectionView} is ready in the provided 
{@link BoundedWindow}.
+     */
+    boolean isReady(PCollectionView<?> view, BoundedWindow window);
+  }
+
+  /**
+   * Create a {@link CounterSet} for this {@link Pipeline}. The {@link 
CounterSet} is independent
+   * of all other {@link CounterSet CounterSets} created by this call.
+   *
+   * The {@link InProcessEvaluationContext} is responsible for unifying the 
counters present in
+   * all created {@link CounterSet CounterSets} when the transforms that call 
this method
+   * complete.
+   */
+  public CounterSet createCounterSet() {
+    return new CounterSet();
+  }
+
+  /**
+   * Returns all of the counters that have been merged into this context via 
calls to
+   * {@link CounterSet#merge(CounterSet)}.
+   */
+  public CounterSet getCounters() {
+    return mergedCounters;
+  }
+
+  /**
+   * Extracts all timers that have been fired and have not already been 
extracted.
+   *
+   * <p>This is a destructive operation. Timers will only appear in the result 
of this method once
+   * for each time they are set.
+   */
+  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
extractFiredTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
+        watermarkManager.extractFiredTimers();
+    return fired;
+  }
+
+  /**
+   * Returns true if the step will not produce additional output.
+   *
+   * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
+   * {@link PCollection PCollections}, returns true if the watermark is at
+   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
+   *
+   * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
+   * {@link PCollection PCollections}, returns the value of
+   * {@link 
InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
+   */
+  public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
+    // if the PTransform's watermark isn't at the max value, it isn't done
+    if (watermarkManager
+        .getWatermarks(transform)
+        .getOutputWatermark()
+        .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      return false;
+    }
+    // If the PTransform has any unbounded outputs, and unbounded producers 
should not be shut down,
+    // the PTransform may produce additional output. It is not done.
+    for (PValue output : transform.getOutput().expand()) {
+      if (output instanceof PCollection) {
+        IsBounded bounded = ((PCollection<?>) output).isBounded();
+        if (bounded.equals(IsBounded.UNBOUNDED)
+            && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
+          return false;
+        }
+      }
+    }
+    // The PTransform's watermark was at positive infinity and all of its 
outputs are known to be
+    // done. It is done.
+    return true;
+  }
+
+  /**
+   * Returns true if all steps are done.
+   */
+  public boolean isDone() {
+    for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+      if (!isDone(transform)) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
new file mode 100644
index 0000000..44d8bd9
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.BaseExecutionContext;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+/**
+ * Execution Context for the {@link InProcessPipelineRunner}.
+ *
+ * This implementation is not thread safe. A new {@link 
InProcessExecutionContext} must be created
+ * for each thread that requires it.
+ */
+class InProcessExecutionContext
+    extends 
BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
+  private final Clock clock;
+  private final Object key;
+  private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+  private final TransformWatermarks watermarks;
+
+  public InProcessExecutionContext(Clock clock, Object key,
+      CopyOnAccessInMemoryStateInternals<Object> existingState, 
TransformWatermarks watermarks) {
+    this.clock = clock;
+    this.key = key;
+    this.existingState = existingState;
+    this.watermarks = watermarks;
+  }
+
+  @Override
+  protected InProcessStepContext createStepContext(String stepName, String 
transformName) {
+    return new InProcessStepContext(this, stepName, transformName);
+  }
+
+  /**
+   * Step Context for the {@link InProcessPipelineRunner}.
+   */
+  public class InProcessStepContext
+      extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
+    private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+    private InProcessTimerInternals timerInternals;
+
+    public InProcessStepContext(
+        ExecutionContext executionContext, String stepName, String 
transformName) {
+      super(executionContext, stepName, transformName);
+    }
+
+    @Override
+    public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+      if (stateInternals == null) {
+        stateInternals = 
CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
+      }
+      return stateInternals;
+    }
+
+    @Override
+    public InProcessTimerInternals timerInternals() {
+      if (timerInternals == null) {
+        timerInternals =
+            InProcessTimerInternals.create(clock, watermarks, 
TimerUpdate.builder(key));
+      }
+      return timerInternals;
+    }
+
+    /**
+     * Commits the state of this step, and returns the committed state. If the 
step has not
+     * accessed any state, return null.
+     */
+    public CopyOnAccessInMemoryStateInternals<?> commitState() {
+      if (stateInternals != null) {
+        return stateInternals.commit();
+      }
+      return null;
+    }
+
+    /**
+     * Gets the timer update of the {@link TimerInternals} of this {@link 
InProcessStepContext},
+     * which is empty if the {@link TimerInternals} were never accessed.
+     */
+    public TimerUpdate getTimerUpdate() {
+      if (timerInternals == null) {
+        return TimerUpdate.empty();
+      }
+      return timerInternals.getTimerUpdate();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
new file mode 100644
index 0000000..d811e1b
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.util.Collection;
+
+/**
+ * An executor that schedules and executes {@link AppliedPTransform 
AppliedPTransforms} for both
+ * source and intermediate {@link PTransform PTransforms}.
+ */
+interface InProcessExecutor {
+  /**
+   * Starts this executor. The provided collection is the collection of root 
transforms to
+   * initially schedule.
+   *
+   * @param rootTransforms
+   */
+  void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
+
+  /**
+   * Blocks until the job being executed enters a terminal state. A job is 
completed after all
+   * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
+   * {@link CommittedBundle Bundles} have been consumed. Jobs may also 
terminate abnormally.
+   *
+   * @throws Throwable whenever an executor thread throws anything, transfers 
the throwable to the
+   *                   waiting thread and rethrows it
+   */
+  void awaitCompletion() throws Throwable;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
new file mode 100644
index 0000000..512b3bd
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Options that can be used to configure the {@link InProcessPipelineRunner}.
+ */
+public interface InProcessPipelineOptions extends PipelineOptions, 
ApplicationNameOptions {
+  /**
+   * Gets the {@link ExecutorServiceFactory} to use to create instances of 
{@link ExecutorService}
+   * to execute {@link PTransform PTransforms}.
+   *
+   * <p>Note that {@link ExecutorService ExecutorServices} returned by the 
factory must ensure that
+   * it cannot enter a state in which it will not schedule additional pending 
work unless currently
+   * scheduled work completes, as this may cause the {@link Pipeline} to cease 
processing.
+   *
+   * <p>Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which 
produces instances of
+   * {@link Executors#newCachedThreadPool()}.
+   */
+  @JsonIgnore
+  @Required
+  @Hidden
+  @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class)
+  ExecutorServiceFactory getExecutorServiceFactory();
+
+  void setExecutorServiceFactory(ExecutorServiceFactory executorService);
+
+  /**
+   * Gets the {@link Clock} used by this pipeline. The clock is used in place 
of accessing the
+   * system time when time values are required by the evaluator.
+   */
+  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
+  @JsonIgnore
+  @Required
+  @Hidden
+  @Description(
+      "The processing time source used by the pipeline. When the current time 
is "
+          + "needed by the evaluator, the result of clock#now() is used.")
+  Clock getClock();
+
+  void setClock(Clock clock);
+
+  @Default.Boolean(false)
+  @Description(
+      "If the pipeline should shut down producers which have reached the 
maximum "
+          + "representable watermark. If this is set to true, a pipeline in 
which all PTransforms "
+          + "have reached the maximum watermark will be shut down, even if 
there are unbounded "
+          + "sources that could produce additional (late) data. By default, if 
the pipeline "
+          + "contains any unbounded PCollections, it will run until explicitly 
shut down.")
+  boolean isShutdownUnboundedProducersWithMaxWatermark();
+
+  void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
+
+  @Default.Boolean(true)
+  @Description(
+      "If the pipeline should block awaiting completion of the pipeline. If 
set to true, "
+          + "a call to Pipeline#run() will block until all PTransforms are 
complete. Otherwise, "
+          + "the Pipeline will execute asynchronously. If set to false, the 
completion of the "
+          + "pipeline can be awaited on by use of 
InProcessPipelineResult#awaitCompletion().")
+  boolean isBlockOnRun();
+
+  void setBlockOnRun(boolean b);
+
+  @Default.Boolean(true)
+  @Description(
+      "Controls whether the runner should ensure that all of the elements of 
every "
+          + "PCollection are not mutated. PTransforms are not permitted to 
mutate input elements "
+          + "at any point, or output elements after they are output.")
+  boolean isTestImmutability();
+
+  void setTestImmutability(boolean test);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
new file mode 100644
index 0000000..bb8c0de
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import 
org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import 
org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
+import 
org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.MapAggregatorValues;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * An In-Memory implementation of the Dataflow Programming Model. Supports 
Unbounded
+ * {@link PCollection PCollections}.
+ */
+@Experimental
+public class InProcessPipelineRunner
+    extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> {
+  /**
+   * The default set of transform overrides to use in the {@link 
InProcessPipelineRunner}.
+   *
+   * <p>A transform override must have a single-argument constructor that 
takes an instance of the
+   * type of transform it is overriding.
+   */
+  @SuppressWarnings("rawtypes")
+  private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
+      defaultTransformOverrides =
+          ImmutableMap.<Class<? extends PTransform>, 
PTransformOverrideFactory>builder()
+              .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
+              .put(CreatePCollectionView.class, new 
InProcessViewOverrideFactory())
+              .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
+              .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
+              .build();
+
+  /**
+   * Part of a {@link PCollection}. Elements are output to a bundle, which 
will cause them to be
+   * executed by {@link PTransform PTransforms} that consume the {@link 
PCollection} this bundle is
+   * a part of at a later point. This is an uncommitted bundle and can have 
elements added to it.
+   *
+   * @param <T> the type of elements that can be added to this bundle
+   */
+  public static interface UncommittedBundle<T> {
+    /**
+     * Returns the PCollection that the elements of this {@link 
UncommittedBundle} belong to.
+     */
+    PCollection<T> getPCollection();
+
+    /**
+     * Outputs an element to this bundle.
+     *
+     * @param element the element to add to this bundle
+     * @return this bundle
+     */
+    UncommittedBundle<T> add(WindowedValue<T> element);
+
+    /**
+     * Commits this {@link UncommittedBundle}, returning an immutable {@link 
CommittedBundle}
+     * containing all of the elements that were added to it. The {@link 
#add(WindowedValue)} method
+     * will throw an {@link IllegalStateException} if called after a call to 
commit.
+     * @param synchronizedProcessingTime the synchronized processing time at 
which this bundle was
+     *                                   committed
+     */
+    CommittedBundle<T> commit(Instant synchronizedProcessingTime);
+  }
+
+  /**
+   * Part of a {@link PCollection}. Elements are output to an {@link 
UncommittedBundle}, which will
+   * eventually committed. Committed elements are executed by the {@link 
PTransform PTransforms}
+   * that consume the {@link PCollection} this bundle is
+   * a part of at a later point.
+   * @param <T> the type of elements contained within this bundle
+   */
+  public static interface CommittedBundle<T> {
+    /**
+     * Returns the PCollection that the elements of this bundle belong to.
+     */
+    PCollection<T> getPCollection();
+
+    /**
+     * Returns the (possibly null) key that was output in the most recent 
{@link GroupByKey} in the
+     * execution of this bundle.
+     */
+    @Nullable
+    Object getKey();
+
+    /**
+     * Returns an {@link Iterable} containing all of the elements that have 
been added to this
+     * {@link CommittedBundle}.
+     */
+    Iterable<WindowedValue<T>> getElements();
+
+    /**
+     * Returns the processing time output watermark at the time the producing 
{@link PTransform}
+     * committed this bundle. Downstream synchronized processing time 
watermarks cannot progress
+     * past this point before consuming this bundle.
+     *
+     * <p>This value is no greater than the earliest incomplete processing 
time or synchronized
+     * processing time {@link TimerData timer} at the time this bundle was 
committed, including any
+     * timers that fired to produce this bundle.
+     */
+    Instant getSynchronizedProcessingOutputWatermark();
+
+    /**
+     * Return a new {@link CommittedBundle} that is like this one, except 
calls to
+     * {@link #getElements()} will return the provided elements. This bundle 
is unchanged.
+     *
+     * <p>
+     * The value of the {@link #getSynchronizedProcessingOutputWatermark() 
synchronized processing
+     * output watermark} of the returned {@link CommittedBundle} is equal to 
the value returned from
+     * the current bundle. This is used to ensure a {@link PTransform} that 
could not complete
+     * processing on input elements properly holds the synchronized processing 
time to the
+     * appropriate value.
+     */
+    CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
+  }
+
+  /**
+   * A {@link PCollectionViewWriter} is responsible for writing contents of a 
{@link PCollection} to
+   * a storage mechanism that can be read from while constructing a {@link 
PCollectionView}.
+   * @param <ElemT> the type of elements the input {@link PCollection} 
contains.
+   * @param <ViewT> the type of the PCollectionView this writer writes to.
+   */
+  public static interface PCollectionViewWriter<ElemT, ViewT> {
+    void add(Iterable<WindowedValue<ElemT>> values);
+  }
+
+  
////////////////////////////////////////////////////////////////////////////////////////////////
+  private final InProcessPipelineOptions options;
+
+  public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
+    return new 
InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
+  }
+
+  private InProcessPipelineRunner(InProcessPipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Returns the {@link PipelineOptions} used to create this {@link 
InProcessPipelineRunner}.
+   */
+  public InProcessPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  @Override
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    PTransformOverrideFactory overrideFactory = 
defaultTransformOverrides.get(transform.getClass());
+    if (overrideFactory != null) {
+      PTransform<InputT, OutputT> customTransform = 
overrideFactory.override(transform);
+
+      return super.apply(customTransform, input);
+    }
+    // If there is no override, or we should not apply the override, apply the 
original transform
+    return super.apply(transform, input);
+  }
+
+  @Override
+  public InProcessPipelineResult run(Pipeline pipeline) {
+    ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new 
ConsumerTrackingPipelineVisitor();
+    pipeline.traverseTopologically(consumerTrackingVisitor);
+    for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) 
{
+      unfinalized.finishSpecifying();
+    }
+    @SuppressWarnings("rawtypes")
+    KeyedPValueTrackingVisitor keyedPValueVisitor =
+        KeyedPValueTrackingVisitor.create(
+            ImmutableSet.<Class<? extends PTransform>>of(
+                GroupByKey.class, InProcessGroupByKeyOnly.class));
+    pipeline.traverseTopologically(keyedPValueVisitor);
+
+    InProcessEvaluationContext context =
+        InProcessEvaluationContext.create(
+            getPipelineOptions(),
+            createBundleFactory(getPipelineOptions()),
+            consumerTrackingVisitor.getRootTransforms(),
+            consumerTrackingVisitor.getValueToConsumers(),
+            consumerTrackingVisitor.getStepNames(),
+            consumerTrackingVisitor.getViews());
+
+    // independent executor service for each run
+    ExecutorService executorService =
+        context.getPipelineOptions().getExecutorServiceFactory().create();
+    InProcessExecutor executor =
+        ExecutorServiceParallelExecutor.create(
+            executorService,
+            consumerTrackingVisitor.getValueToConsumers(),
+            keyedPValueVisitor.getKeyedPValues(),
+            TransformEvaluatorRegistry.defaultRegistry(),
+            defaultModelEnforcements(options),
+            context);
+    executor.start(consumerTrackingVisitor.getRootTransforms());
+
+    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+        new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
+    InProcessPipelineResult result =
+        new InProcessPipelineResult(executor, context, aggregatorSteps);
+    if (options.isBlockOnRun()) {
+      try {
+        result.awaitCompletion();
+      } catch (UserCodeException userException) {
+        throw new PipelineExecutionException(userException.getCause());
+      } catch (Throwable t) {
+        Throwables.propagate(t);
+      }
+    }
+    return result;
+  }
+
+  private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+      defaultModelEnforcements(InProcessPipelineOptions options) {
+    ImmutableMap.Builder<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
+        enforcements = ImmutableMap.builder();
+    Collection<ModelEnforcementFactory> parDoEnforcements = 
createParDoEnforcements(options);
+    enforcements.put(ParDo.Bound.class, parDoEnforcements);
+    enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+    return enforcements.build();
+  }
+
+  private Collection<ModelEnforcementFactory> createParDoEnforcements(
+      InProcessPipelineOptions options) {
+    ImmutableList.Builder<ModelEnforcementFactory> enforcements = 
ImmutableList.builder();
+    if (options.isTestImmutability()) {
+      enforcements.add(ImmutabilityEnforcementFactory.create());
+    }
+    return enforcements.build();
+  }
+
+  private BundleFactory createBundleFactory(InProcessPipelineOptions 
pipelineOptions) {
+    BundleFactory bundleFactory = InProcessBundleFactory.create();
+    if (pipelineOptions.isTestImmutability()) {
+      bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+    }
+    return bundleFactory;
+  }
+
+  /**
+   * The result of running a {@link Pipeline} with the {@link 
InProcessPipelineRunner}.
+   *
+   * Throws {@link UnsupportedOperationException} for all methods.
+   */
+  public static class InProcessPipelineResult implements PipelineResult {
+    private final InProcessExecutor executor;
+    private final InProcessEvaluationContext evaluationContext;
+    private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> 
aggregatorSteps;
+    private State state;
+
+    private InProcessPipelineResult(
+        InProcessExecutor executor,
+        InProcessEvaluationContext evaluationContext,
+        Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
+      this.executor = executor;
+      this.evaluationContext = evaluationContext;
+      this.aggregatorSteps = aggregatorSteps;
+      // Only ever constructed after the executor has started.
+      this.state = State.RUNNING;
+    }
+
+    @Override
+    public State getState() {
+      return state;
+    }
+
+    @Override
+    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> 
aggregator)
+        throws AggregatorRetrievalException {
+      CounterSet counters = evaluationContext.getCounters();
+      Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
+      Map<String, T> stepValues = new HashMap<>();
+      for (AppliedPTransform<?, ?, ?> transform : 
evaluationContext.getSteps()) {
+        if (steps.contains(transform.getTransform())) {
+          String stepName =
+              String.format(
+                  "user-%s-%s", evaluationContext.getStepName(transform), 
aggregator.getName());
+          Counter<T> counter = (Counter<T>) 
counters.getExistingCounter(stepName);
+          if (counter != null) {
+            stepValues.put(transform.getFullName(), counter.getAggregate());
+          }
+        }
+      }
+      return new MapAggregatorValues<>(stepValues);
+    }
+
+    /**
+     * Blocks until the {@link Pipeline} execution represented by this
+     * {@link InProcessPipelineResult} is complete, returning the terminal 
state.
+     *
+     * <p>If the pipeline terminates abnormally by throwing an exception, this 
will rethrow the
+     * exception. Future calls to {@link #getState()} will return
+     * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
+     *
+     * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED 
unbounded}
+     * {@link PCollection}, and the {@link PipelineRunner} was created with
+     * {@link 
InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to 
false,
+     * this method will never return.
+     *
+     * See also {@link InProcessExecutor#awaitCompletion()}.
+     */
+    public State awaitCompletion() throws Throwable {
+      if (!state.isTerminal()) {
+        try {
+          executor.awaitCompletion();
+          state = State.DONE;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw e;
+        } catch (Throwable t) {
+          state = State.FAILED;
+          throw t;
+        }
+      }
+      return state;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
new file mode 100644
index 0000000..4a09de7
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link 
PipelineOptionsRegistrar} for the
+ * {@link InProcessPipelineRunner}.
+ */
+public class InProcessRegistrar {
+  private InProcessRegistrar() {}
+  /**
+   * Registers the {@link InProcessPipelineRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class InProcessRunner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends 
PipelineRunner<?>>>of(InProcessPipelineRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link InProcessPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class InProcessOptions implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends 
PipelineOptions>>of(InProcessPipelineOptions.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
new file mode 100644
index 0000000..f4980ef
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import 
org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PCollectionViewWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.annotation.Nullable;
+
+/**
+ * An in-process container for {@link PCollectionView PCollectionViews}, which 
provides methods for
+ * constructing {@link SideInputReader SideInputReaders} which block until a 
side input is
+ * available and writing to a {@link PCollectionView}.
+ */
+class InProcessSideInputContainer {
+  private final InProcessEvaluationContext evaluationContext;
+  private final Collection<PCollectionView<?>> containedViews;
+  private final LoadingCache<PCollectionViewWindow<?>,
+      SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
+
+  /**
+   * Create a new {@link InProcessSideInputContainer} with the provided views 
and the provided
+   * context.
+   */
+  public static InProcessSideInputContainer create(
+      InProcessEvaluationContext context, Collection<PCollectionView<?>> 
containedViews) {
+    CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends 
WindowedValue<?>>>>
+        loader = new CacheLoader<PCollectionViewWindow<?>,
+            SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
+          @Override
+          public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
+              PCollectionViewWindow<?> view) {
+            return SettableFuture.create();
+          }
+        };
+    LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends 
WindowedValue<?>>>>
+        viewByWindows = CacheBuilder.newBuilder().build(loader);
+    return new InProcessSideInputContainer(context, containedViews, 
viewByWindows);
+  }
+
+  private InProcessSideInputContainer(InProcessEvaluationContext context,
+      Collection<PCollectionView<?>> containedViews,
+      LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends 
WindowedValue<?>>>>
+      viewByWindows) {
+    this.evaluationContext = context;
+    this.containedViews = ImmutableSet.copyOf(containedViews);
+    this.viewByWindows = viewByWindows;
+  }
+
+  /**
+   * Return a view of this {@link InProcessSideInputContainer} that contains 
only the views in the
+   * provided argument. The returned {@link InProcessSideInputContainer} is 
unmodifiable without
+   * casting, but will change as this {@link InProcessSideInputContainer} is 
modified.
+   */
+  public ReadyCheckingSideInputReader createReaderForViews(
+      Collection<PCollectionView<?>> newContainedViews) {
+    if (!containedViews.containsAll(newContainedViews)) {
+      Set<PCollectionView<?>> currentlyContained = 
ImmutableSet.copyOf(containedViews);
+      Set<PCollectionView<?>> newRequested = 
ImmutableSet.copyOf(newContainedViews);
+      throw new IllegalArgumentException("Can't create a SideInputReader with 
unknown views "
+          + Sets.difference(newRequested, currentlyContained));
+    }
+    return new SideInputContainerSideInputReader(newContainedViews);
+  }
+
+  /**
+   * Write the provided values to the provided view.
+   *
+   * <p>The windowed values are first exploded, then for each window the pane 
is determined. For
+   * each window, if the pane is later than the current pane stored within 
this container, write
+   * all of the values to the container as the new values of the {@link 
PCollectionView}.
+   *
+   * <p>The provided iterable is expected to contain only a single window and 
pane.
+   */
+  public void write(PCollectionView<?> view, Iterable<? extends 
WindowedValue<?>> values) {
+    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
+        indexValuesByWindow(values);
+    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
+        valuesPerWindow.entrySet()) {
+      updatePCollectionViewWindowValues(view, windowValues.getKey(), 
windowValues.getValue());
+    }
+  }
+
+  /**
+   * Index the provided values by all {@link BoundedWindow windows} in which 
they appear.
+   */
+  private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
+      Iterable<? extends WindowedValue<?>> values) {
+    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new 
HashMap<>();
+    for (WindowedValue<?> value : values) {
+      for (BoundedWindow window : value.getWindows()) {
+        Collection<WindowedValue<?>> windowValues = 
valuesPerWindow.get(window);
+        if (windowValues == null) {
+          windowValues = new ArrayList<>();
+          valuesPerWindow.put(window, windowValues);
+        }
+        windowValues.add(value);
+      }
+    }
+    return valuesPerWindow;
+  }
+
+  /**
+   * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} 
to be based on the
+   * specified values, if the values are part of a later pane than currently 
exist within the
+   * {@link PCollectionViewWindow}.
+   */
+  private void updatePCollectionViewWindowValues(
+      PCollectionView<?> view, BoundedWindow window, 
Collection<WindowedValue<?>> windowValues) {
+    PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, 
window);
+    SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
+    try {
+      future = viewByWindows.get(windowedView);
+      if (future.isDone()) {
+        Iterator<? extends WindowedValue<?>> existingValues = 
future.get().iterator();
+        PaneInfo newPane = windowValues.iterator().next().getPane();
+        // The current value may have no elements, if no elements were 
produced for the window,
+        // but we are recieving late data.
+        if (!existingValues.hasNext()
+            || newPane.getIndex() > 
existingValues.next().getPane().getIndex()) {
+          viewByWindows.invalidate(windowedView);
+          viewByWindows.get(windowedView).set(windowValues);
+        }
+      } else {
+        future.set(windowValues);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      if (future != null && !future.isDone()) {
+        future.set(Collections.<WindowedValue<?>>emptyList());
+      }
+    } catch (ExecutionException e) {
+      Throwables.propagate(e.getCause());
+    }
+  }
+
+  private final class SideInputContainerSideInputReader implements 
ReadyCheckingSideInputReader {
+    private final Collection<PCollectionView<?>> readerViews;
+
+    private SideInputContainerSideInputReader(Collection<PCollectionView<?>> 
readerViews) {
+      this.readerViews = ImmutableSet.copyOf(readerViews);
+    }
+
+    @Override
+    public boolean isReady(final PCollectionView<?> view, final BoundedWindow 
window) {
+      checkArgument(
+          readerViews.contains(view),
+          "Tried to check if view %s was ready in a SideInputReader that does 
not contain it. "
+              + "Contained views; %s",
+          view,
+          readerViews);
+      return getViewFuture(view, window).isDone();
+    }
+
+    @Override
+    @Nullable
+    public <T> T get(final PCollectionView<T> view, final BoundedWindow 
window) {
+      checkArgument(
+          readerViews.contains(view), "calling get(PCollectionView) with 
unknown view: " + view);
+      try {
+        final Future<Iterable<? extends WindowedValue<?>>> future = 
getViewFuture(view, window);
+        // Safe covariant cast
+        @SuppressWarnings("unchecked")
+        Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) 
future.get();
+        return view.fromIterableInternal(values);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return null;
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    /**
+     * Gets the future containing the contents of the provided {@link 
PCollectionView} in the
+     * provided {@link BoundedWindow}, setting up a callback to populate the 
future with empty
+     * contents if necessary.
+     */
+    private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
+        final PCollectionView<T> view, final BoundedWindow window)  {
+      PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, 
window);
+      final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
+          viewByWindows.getUnchecked(windowedView);
+
+      WindowingStrategy<?, ?> windowingStrategy = 
view.getWindowingStrategyInternal();
+      evaluationContext.scheduleAfterOutputWouldBeProduced(
+          view, window, windowingStrategy, new WriteEmptyViewContents(view, 
window, future));
+      return future;
+    }
+
+    @Override
+    public <T> boolean contains(PCollectionView<T> view) {
+      return readerViews.contains(view);
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return readerViews.isEmpty();
+    }
+  }
+
+  private static class WriteEmptyViewContents implements Runnable {
+    private final PCollectionView<?> view;
+    private final BoundedWindow window;
+    private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;
+
+    private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow 
window,
+        SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
+      this.future = future;
+      this.view = view;
+      this.window = window;
+    }
+
+    @Override
+    public void run() {
+      // The requested window has closed without producing elements, so 
reflect that in
+      // the PCollectionView. If set has already been called, will do nothing.
+      future.set(Collections.<WindowedValue<?>>emptyList());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("view", view)
+          .add("window", window)
+          .toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
new file mode 100644
index 0000000..cd54f59
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import 
org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimerInternals;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link TimerInternals} where all relevant data exists 
in memory.
+ */
+public class InProcessTimerInternals implements TimerInternals {
+  private final Clock processingTimeClock;
+  private final TransformWatermarks watermarks;
+  private final TimerUpdateBuilder timerUpdateBuilder;
+
+  public static InProcessTimerInternals create(
+      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder 
timerUpdateBuilder) {
+    return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder);
+  }
+
+  private InProcessTimerInternals(
+      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder 
timerUpdateBuilder) {
+    this.processingTimeClock = clock;
+    this.watermarks = watermarks;
+    this.timerUpdateBuilder = timerUpdateBuilder;
+  }
+
+  @Override
+  public void setTimer(TimerData timerKey) {
+    timerUpdateBuilder.setTimer(timerKey);
+  }
+
+  @Override
+  public void deleteTimer(TimerData timerKey) {
+    timerUpdateBuilder.deletedTimer(timerKey);
+  }
+
+  public TimerUpdate getTimerUpdate() {
+    return timerUpdateBuilder.build();
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTimeClock.now();
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return watermarks.getSynchronizedProcessingInputTime();
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return watermarks.getInputWatermark();
+  }
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return watermarks.getOutputWatermark();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
new file mode 100644
index 0000000..a132c33
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * The result of evaluating an {@link AppliedPTransform} with a {@link 
TransformEvaluator}.
+ */
+public interface InProcessTransformResult {
+  /**
+   * Returns the {@link AppliedPTransform} that produced this result.
+   */
+  AppliedPTransform<?, ?, ?> getTransform();
+
+  /**
+   * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by 
this transform. These
+   * will be committed by the evaluation context as part of completing this 
result.
+   */
+  Iterable<? extends UncommittedBundle<?>> getOutputBundles();
+
+  /**
+   * Returns the {@link CounterSet} used by this {@link PTransform}, or null 
if this transform did
+   * not use a {@link CounterSet}.
+   */
+  @Nullable CounterSet getCounters();
+
+  /**
+   * Returns the Watermark Hold for the transform at the time this result was 
produced.
+   *
+   * If the transform does not set any watermark hold, returns
+   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+   */
+  Instant getWatermarkHold();
+
+  /**
+   * Returns the State used by the transform.
+   *
+   * If this evaluation did not access state, this may return null.
+   */
+  @Nullable
+  CopyOnAccessInMemoryStateInternals<?> getState();
+
+  /**
+   * Returns a TimerUpdateBuilder that was produced as a result of this 
evaluation. If the
+   * evaluation was triggered due to the delivery of one or more timers, those 
timers must be added
+   * to the builder before it is complete.
+   *
+   * <p>If this evaluation did not add or remove any timers, returns an empty 
TimerUpdate.
+   */
+  TimerUpdate getTimerUpdate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
new file mode 100644
index 0000000..b7c755e
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link 
PValue} is keyed if it
+ * is the result of a {@link PTransform} that produces keyed outputs. A {@link 
PTransform} that
+ * produces keyed outputs is assumed to colocate output elements that share a 
key.
+ *
+ * <p>All {@link GroupByKey} transforms, or their runner-specific 
implementation primitive, produce
+ * keyed output.
+ */
+// TODO: Handle Key-preserving transforms when appropriate and more 
aggressively make PTransforms
+// unkeyed
+class KeyedPValueTrackingVisitor implements PipelineVisitor {
+  @SuppressWarnings("rawtypes")
+  private final Set<Class<? extends PTransform>> producesKeyedOutputs;
+  private final Set<PValue> keyedValues;
+  private boolean finalized;
+
+  public static KeyedPValueTrackingVisitor create(
+      @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> 
producesKeyedOutputs) {
+    return new KeyedPValueTrackingVisitor(producesKeyedOutputs);
+  }
+
+  private KeyedPValueTrackingVisitor(
+      @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> 
producesKeyedOutputs) {
+    this.producesKeyedOutputs = producesKeyedOutputs;
+    this.keyedValues = new HashSet<>();
+  }
+
+  @Override
+  public void enterCompositeTransform(TransformTreeNode node) {
+    checkState(
+        !finalized,
+        "Attempted to use a %s that has already been finalized on a pipeline 
(visiting node %s)",
+        KeyedPValueTrackingVisitor.class.getSimpleName(),
+        node);
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    checkState(
+        !finalized,
+        "Attempted to use a %s that has already been finalized on a pipeline 
(visiting node %s)",
+        KeyedPValueTrackingVisitor.class.getSimpleName(),
+        node);
+    if (node.isRootNode()) {
+      finalized = true;
+    } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
+      keyedValues.addAll(node.getExpandedOutputs());
+    }
+  }
+
+  @Override
+  public void visitTransform(TransformTreeNode node) {}
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
+      keyedValues.addAll(value.expand());
+    }
+  }
+
+  public Set<PValue> getKeyedPValues() {
+    checkState(
+        finalized, "can't call getKeyedPValues before a Pipeline has been 
completely traversed");
+    return keyedValues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
new file mode 100644
index 0000000..cc9b6da
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Enforcement tools that verify that executing code conforms to the model.
+ *
+ * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
+ * {@link ModelEnforcement} is provided with the input bundle as part of
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, 
AppliedPTransform)}, each element
+ * before and after that element is provided to an underlying {@link 
TransformEvaluator}, and the
+ * output {@link InProcessTransformResult} and committed output bundles after 
the
+ * {@link TransformEvaluator} has completed.
+ *
+ * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such 
as the {@link Coder}
+ * of the input {@link PCollection} on construction, and then enforce 
per-element behavior
+ * (such as the immutability of input elements). When the element is output or 
the bundle is
+ * completed, the required conditions can be enforced across all elements.
+ */
+public interface ModelEnforcement<T> {
+  /**
+   * Called before a call to {@link 
TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void beforeElement(WindowedValue<T> element);
+
+  /**
+   * Called after a call to {@link 
TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void afterElement(WindowedValue<T> element);
+
+  /**
+   * Called after a bundle has been completed and {@link 
TransformEvaluator#finishBundle()} has been
+   * called, producing the provided {@link InProcessTransformResult} and
+   * {@link CommittedBundle output bundles}.
+   */
+  void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
new file mode 100644
index 0000000..6162ba0
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+/**
+ * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} 
on an input
+ * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are 
created before the
+ * {@link TransformEvaluator} is created.
+ */
+public interface ModelEnforcementFactory {
+  <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, 
AppliedPTransform<?, ?, ?> consumer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
new file mode 100644
index 0000000..ffdee9d
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link Clock} that uses {@link System#nanoTime()} to track the progress 
of time.
+ */
+public class NanosOffsetClock implements Clock {
+  private final long baseMillis;
+  private final long nanosAtBaseMillis;
+
+  public static NanosOffsetClock create() {
+    return new NanosOffsetClock();
+  }
+
+  private NanosOffsetClock() {
+    baseMillis = System.currentTimeMillis();
+    nanosAtBaseMillis = System.nanoTime();
+  }
+
+  @Override
+  public Instant now() {
+    return new Instant(
+        baseMillis + (TimeUnit.MILLISECONDS.convert(
+            System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
+  }
+
+  /**
+   * Creates instances of {@link NanosOffsetClock}.
+   */
+  public static class Factory implements DefaultValueFactory<Clock> {
+    @Override
+    public Clock create(PipelineOptions options) {
+      return new NanosOffsetClock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
new file mode 100644
index 0000000..81e4863
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+interface PTransformOverrideFactory {
+  /**
+   * Create a {@link PTransform} override for the provided {@link PTransform} 
if applicable.
+   * Otherwise, return the input {@link PTransform}.
+   *
+   * <p>The returned PTransform must be semantically equivalent to the input 
{@link PTransform}.
+   */
+  <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> 
override(
+      PTransform<InputT, OutputT> transform);
+}

Reply via email to