Move InProcessRunner to its own module

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e13cacb8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e13cacb8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e13cacb8

Branch: refs/heads/master
Commit: e13cacb81c9c9718a45bfb7aefd839dcdfd442f6
Parents: bba4c64
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Apr 27 15:01:48 2016 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Apr 29 14:28:45 2016 -0700

----------------------------------------------------------------------
 .travis.yml                                     |    1 +
 runners/direct-java/pom.xml                     |  400 ++++++
 .../direct/AbstractModelEnforcement.java        |   38 +
 .../direct/AvroIOShardedWriteFactory.java       |   76 +
 .../direct/BoundedReadEvaluatorFactory.java     |  155 ++
 .../beam/runners/direct/BundleFactory.java      |   49 +
 .../CachedThreadPoolExecutorServiceFactory.java |   44 +
 .../org/apache/beam/runners/direct/Clock.java   |   30 +
 .../beam/runners/direct/CommittedResult.java    |   46 +
 .../beam/runners/direct/CompletionCallback.java |   36 +
 .../direct/ConsumerTrackingPipelineVisitor.java |  173 +++
 .../runners/direct/EmptyTransformEvaluator.java |   50 +
 .../direct/EncodabilityEnforcementFactory.java  |   70 +
 .../beam/runners/direct/EvaluatorKey.java       |   55 +
 .../runners/direct/ExecutorServiceFactory.java  |   33 +
 .../direct/ExecutorServiceParallelExecutor.java |  478 +++++++
 .../runners/direct/FlattenEvaluatorFactory.java |   85 ++
 .../runners/direct/ForwardingPTransform.java    |   62 +
 .../direct/GroupByKeyEvaluatorFactory.java      |  274 ++++
 .../ImmutabilityCheckingBundleFactory.java      |  131 ++
 .../direct/ImmutabilityEnforcementFactory.java  |  103 ++
 .../direct/InMemoryWatermarkManager.java        | 1327 ++++++++++++++++++
 .../runners/direct/InProcessBundleFactory.java  |  162 +++
 .../direct/InProcessBundleOutputManager.java    |   51 +
 .../direct/InProcessEvaluationContext.java      |  425 ++++++
 .../direct/InProcessExecutionContext.java       |  105 ++
 .../beam/runners/direct/InProcessExecutor.java  |   48 +
 .../direct/InProcessPipelineOptions.java        |  101 ++
 .../runners/direct/InProcessPipelineRunner.java |  370 +++++
 .../beam/runners/direct/InProcessRegistrar.java |   55 +
 .../direct/InProcessSideInputContainer.java     |  271 ++++
 .../runners/direct/InProcessTimerInternals.java |   84 ++
 .../direct/InProcessTransformResult.java        |   77 +
 .../direct/KeyedPValueTrackingVisitor.java      |   96 ++
 .../beam/runners/direct/ModelEnforcement.java   |   63 +
 .../runners/direct/ModelEnforcementFactory.java |   30 +
 .../beam/runners/direct/NanosOffsetClock.java   |   59 +
 .../direct/PTransformOverrideFactory.java       |   33 +
 .../runners/direct/ParDoInProcessEvaluator.java |  173 +++
 .../direct/ParDoMultiEvaluatorFactory.java      |   64 +
 .../direct/ParDoSingleEvaluatorFactory.java     |   63 +
 .../direct/PassthroughTransformEvaluator.java   |   49 +
 .../runners/direct/ShardControlledWrite.java    |   81 ++
 .../apache/beam/runners/direct/StepAndKey.java  |   71 +
 .../runners/direct/StepTransformResult.java     |  165 +++
 .../direct/TextIOShardedWriteFactory.java       |   78 +
 .../beam/runners/direct/TransformEvaluator.java |   46 +
 .../direct/TransformEvaluatorFactory.java       |   44 +
 .../direct/TransformEvaluatorRegistry.java      |   77 +
 .../beam/runners/direct/TransformExecutor.java  |  176 +++
 .../direct/TransformExecutorService.java        |   35 +
 .../direct/TransformExecutorServices.java       |  154 ++
 .../direct/UnboundedReadEvaluatorFactory.java   |  177 +++
 .../runners/direct/ViewEvaluatorFactory.java    |  145 ++
 .../direct/WatermarkCallbackExecutor.java       |  146 ++
 .../runners/direct/WindowEvaluatorFactory.java  |  131 ++
 .../direct/AvroIOShardedWriteFactoryTest.java   |  112 ++
 .../direct/BoundedReadEvaluatorFactoryTest.java |  290 ++++
 .../runners/direct/CommittedResultTest.java     |   77 +
 .../ConsumerTrackingPipelineVisitorTest.java    |  272 ++++
 .../EncodabilityEnforcementFactoryTest.java     |  257 ++++
 .../direct/FlattenEvaluatorFactoryTest.java     |  141 ++
 .../direct/ForwardingPTransformTest.java        |  112 ++
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |  183 +++
 .../ImmutabilityCheckingBundleFactoryTest.java  |  220 +++
 .../ImmutabilityEnforcementFactoryTest.java     |  128 ++
 .../direct/InMemoryWatermarkManagerTest.java    | 1168 +++++++++++++++
 .../direct/InProcessBundleFactoryTest.java      |  223 +++
 .../direct/InProcessEvaluationContextTest.java  |  526 +++++++
 .../direct/InProcessPipelineRegistrarTest.java  |   74 +
 .../direct/InProcessPipelineRunnerTest.java     |   78 +
 .../direct/InProcessSideInputContainerTest.java |  496 +++++++
 .../direct/InProcessTimerInternalsTest.java     |  133 ++
 .../direct/KeyedPValueTrackingVisitorTest.java  |  192 +++
 .../apache/beam/runners/direct/MockClock.java   |   62 +
 .../direct/ParDoMultiEvaluatorFactoryTest.java  |  431 ++++++
 .../direct/ParDoSingleEvaluatorFactoryTest.java |  324 +++++
 .../direct/TextIOShardedWriteFactoryTest.java   |  112 ++
 .../direct/TransformExecutorServicesTest.java   |  136 ++
 .../runners/direct/TransformExecutorTest.java   |  538 +++++++
 .../UnboundedReadEvaluatorFactoryTest.java      |  334 +++++
 .../direct/ViewEvaluatorFactoryTest.java        |  101 ++
 .../direct/WatermarkCallbackExecutorTest.java   |  128 ++
 .../direct/WindowEvaluatorFactoryTest.java      |  222 +++
 runners/pom.xml                                 |    1 +
 .../inprocess/AbstractModelEnforcement.java     |   38 -
 .../inprocess/AvroIOShardedWriteFactory.java    |   76 -
 .../inprocess/BoundedReadEvaluatorFactory.java  |  155 --
 .../sdk/runners/inprocess/BundleFactory.java    |   49 -
 .../CachedThreadPoolExecutorServiceFactory.java |   44 -
 .../beam/sdk/runners/inprocess/Clock.java       |   30 -
 .../sdk/runners/inprocess/CommittedResult.java  |   46 -
 .../runners/inprocess/CompletionCallback.java   |   36 -
 .../ConsumerTrackingPipelineVisitor.java        |  173 ---
 .../inprocess/EmptyTransformEvaluator.java      |   50 -
 .../EncodabilityEnforcementFactory.java         |   70 -
 .../sdk/runners/inprocess/EvaluatorKey.java     |   55 -
 .../inprocess/ExecutorServiceFactory.java       |   33 -
 .../ExecutorServiceParallelExecutor.java        |  478 -------
 .../inprocess/FlattenEvaluatorFactory.java      |   85 --
 .../runners/inprocess/ForwardingPTransform.java |   62 -
 .../inprocess/GroupByKeyEvaluatorFactory.java   |  274 ----
 .../ImmutabilityCheckingBundleFactory.java      |  131 --
 .../ImmutabilityEnforcementFactory.java         |  103 --
 .../inprocess/InMemoryWatermarkManager.java     | 1327 ------------------
 .../inprocess/InProcessBundleFactory.java       |  162 ---
 .../inprocess/InProcessBundleOutputManager.java |   51 -
 .../inprocess/InProcessEvaluationContext.java   |  425 ------
 .../inprocess/InProcessExecutionContext.java    |  105 --
 .../runners/inprocess/InProcessExecutor.java    |   48 -
 .../inprocess/InProcessPipelineOptions.java     |  101 --
 .../inprocess/InProcessPipelineRunner.java      |  370 -----
 .../runners/inprocess/InProcessRegistrar.java   |   55 -
 .../inprocess/InProcessSideInputContainer.java  |  271 ----
 .../inprocess/InProcessTimerInternals.java      |   84 --
 .../inprocess/InProcessTransformResult.java     |   77 -
 .../inprocess/KeyedPValueTrackingVisitor.java   |   96 --
 .../sdk/runners/inprocess/ModelEnforcement.java |   63 -
 .../inprocess/ModelEnforcementFactory.java      |   30 -
 .../sdk/runners/inprocess/NanosOffsetClock.java |   59 -
 .../inprocess/PTransformOverrideFactory.java    |   33 -
 .../inprocess/ParDoInProcessEvaluator.java      |  173 ---
 .../inprocess/ParDoMultiEvaluatorFactory.java   |   63 -
 .../inprocess/ParDoSingleEvaluatorFactory.java  |   63 -
 .../PassthroughTransformEvaluator.java          |   49 -
 .../runners/inprocess/ShardControlledWrite.java |   81 --
 .../beam/sdk/runners/inprocess/StepAndKey.java  |   71 -
 .../runners/inprocess/StepTransformResult.java  |  165 ---
 .../inprocess/TextIOShardedWriteFactory.java    |   78 -
 .../runners/inprocess/TransformEvaluator.java   |   46 -
 .../inprocess/TransformEvaluatorFactory.java    |   44 -
 .../inprocess/TransformEvaluatorRegistry.java   |   77 -
 .../runners/inprocess/TransformExecutor.java    |  176 ---
 .../inprocess/TransformExecutorService.java     |   35 -
 .../inprocess/TransformExecutorServices.java    |  154 --
 .../UnboundedReadEvaluatorFactory.java          |  177 ---
 .../runners/inprocess/ViewEvaluatorFactory.java |  145 --
 .../inprocess/WatermarkCallbackExecutor.java    |  146 --
 .../inprocess/WindowEvaluatorFactory.java       |  131 --
 .../AvroIOShardedWriteFactoryTest.java          |  112 --
 .../BoundedReadEvaluatorFactoryTest.java        |  290 ----
 .../runners/inprocess/CommittedResultTest.java  |   77 -
 .../ConsumerTrackingPipelineVisitorTest.java    |  272 ----
 .../EncodabilityEnforcementFactoryTest.java     |  257 ----
 .../inprocess/FlattenEvaluatorFactoryTest.java  |  141 --
 .../inprocess/ForwardingPTransformTest.java     |  112 --
 .../GroupByKeyEvaluatorFactoryTest.java         |  183 ---
 .../ImmutabilityCheckingBundleFactoryTest.java  |  220 ---
 .../ImmutabilityEnforcementFactoryTest.java     |  128 --
 .../inprocess/InMemoryWatermarkManagerTest.java | 1168 ---------------
 .../inprocess/InProcessBundleFactoryTest.java   |  223 ---
 .../InProcessEvaluationContextTest.java         |  526 -------
 .../InProcessPipelineRegistrarTest.java         |   74 -
 .../inprocess/InProcessPipelineRunnerTest.java  |   78 -
 .../InProcessSideInputContainerTest.java        |  496 -------
 .../inprocess/InProcessTimerInternalsTest.java  |  133 --
 .../KeyedPValueTrackingVisitorTest.java         |  192 ---
 .../beam/sdk/runners/inprocess/MockClock.java   |   62 -
 .../ParDoMultiEvaluatorFactoryTest.java         |  431 ------
 .../ParDoSingleEvaluatorFactoryTest.java        |  324 -----
 .../TextIOShardedWriteFactoryTest.java          |  112 --
 .../TransformExecutorServicesTest.java          |  136 --
 .../inprocess/TransformExecutorTest.java        |  538 -------
 .../UnboundedReadEvaluatorFactoryTest.java      |  334 -----
 .../inprocess/ViewEvaluatorFactoryTest.java     |  101 --
 .../WatermarkCallbackExecutorTest.java          |  128 --
 .../inprocess/WindowEvaluatorFactoryTest.java   |  222 ---
 sdks/java/pom.xml                               |    1 -
 168 files changed, 14692 insertions(+), 14290 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index f1d9e3b..8aad36e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,6 +25,7 @@ matrix:
       env: CUSTOM_JDK="openjdk7" MAVEN_OVERRIDE="-DforkCount=0"
 
 before_install:
+  - echo "MAVEN_OPTS='-Xmx2048m -XX:MaxPermSize=512m'" > ~/.mavenrc
   - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export 
JAVA_HOME=$(/usr/libexec/java_home); fi
   - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; 
fi
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
new file mode 100644
index 0000000..12ba329
--- /dev/null
+++ b/runners/direct-java/pom.xml
@@ -0,0 +1,400 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>runners-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>direct-runner</artifactId>
+
+  <name>Apache Beam :: Runners :: Direct</name>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+    </resources>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals><goal>analyze-only</goal></goals>
+            <configuration>
+              <failOnWarning>true</failOnWarning>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-jar</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>default-test-jar</id>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>jar-no-fork</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>attach-test-sources</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>test-jar-no-fork</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <executions>
+          <!-- For now, disables integration tests from the SDK as the runner 
is not ready. -->
+          <execution>
+            <id>runnable-on-service-tests</id>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <windowtitle>Apache Beam Direct Runner 
${project.version}</windowtitle>
+          <doctitle>Apache Beam Direct Runner, version 
${project.version}</doctitle>
+
+          <subpackages>org.apache.beam.runners.direct</subpackages>
+          <use>false</use>
+          <quiet>true</quiet>
+          <bottom><![CDATA[<br>]]></bottom>
+
+          <offlineLinks>
+            <offlineLink>
+              
<url>https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/apiclient-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://avro.apache.org/docs/1.7.7/api/java/</url>
+              <location>${basedir}/../../sdks/java/javadoc/avro-docs</location>
+            </offlineLink>
+            <offlineLink>
+              
<url>https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/</url>
+              <location>${basedir}/../../sdks/java/javadoc/bq-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>https://cloud.google.com/datastore/docs/apis/javadoc/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/datastore-docs</location>
+            </offlineLink>
+            <offlineLink>
+              
<url>http://docs.guava-libraries.googlecode.com/git-history/release19/javadoc/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/guava-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://hamcrest.org/JavaHamcrest/javadoc/1.3/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/hamcrest-docs</location>
+            </offlineLink>
+            <offlineLink>
+              
<url>http://fasterxml.github.io/jackson-annotations/javadoc/2.7/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/jackson-annotations-docs</location>
+            </offlineLink>
+            <offlineLink>
+              
<url>http://fasterxml.github.io/jackson-databind/javadoc/2.7/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/jackson-databind-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://www.joda.org/joda-time/apidocs</url>
+              <location>${basedir}/../../sdks/java/javadoc/joda-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://junit.sourceforge.net/javadoc/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/junit-docs</location>
+            </offlineLink>
+            <offlineLink>
+              
<url>https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/</url>
+              
<location>${basedir}/../../sdks/java/javadoc/oauth-docs</location>
+            </offlineLink>
+          </offlineLinks>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.4.1</version>
+        <executions>
+          <!-- In the first phase, we pick dependencies and relocate them. -->
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <!-- TODO: Once ready, change the following pattern to 'com'
+                     only, exclude 'org.apache.beam.**', and remove
+                     the second relocation. -->
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  
<shadedPattern>org.apache.beam.runners.direct.repackaged.com.google.common</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+
+          <!-- In the second phase, we pick remaining dependencies and bundle
+               them without repackaging. -->
+          <execution>
+            <id>bundle-rest-without-repackaging</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
+              <artifactSet>
+                <excludes>
+                  <exclude>com.google.guava:guava</exclude>
+                </excludes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client-protobuf</artifactId>
+      <version>${google-clients.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <!-- If updating version, please update the javadoc offlineLink -->
+      <version>${guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava-testlib</artifactId>
+      <version>${guava.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <version>${joda.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>${jsr305.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <!-- build dependencies -->
+    <dependency>
+      <groupId>com.google.auto.service</groupId>
+      <artifactId>auto-service</artifactId>
+      <version>1.0-rc2</version>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <version>1.1</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.10.19</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
new file mode 100644
index 0000000..948beb6
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * An abstract {@link ModelEnforcement} that provides default empty 
implementations for each method.
+ */
+abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
+  @Override
+  public void beforeElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
new file mode 100644
index 0000000..7422f27
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+class AvroIOShardedWriteFactory implements PTransformOverrideFactory {
+  @Override
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, 
OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    if (transform instanceof AvroIO.Write.Bound) {
+      @SuppressWarnings("unchecked")
+      AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) 
transform;
+      if (originalWrite.getNumShards() > 1
+          || (originalWrite.getNumShards() == 1
+              && !"".equals(originalWrite.getShardNameTemplate()))) {
+        @SuppressWarnings("unchecked")
+        PTransform<InputT, OutputT> override =
+            (PTransform<InputT, OutputT>) new 
AvroIOShardedWrite<InputT>(originalWrite);
+        return override;
+      }
+    }
+    return transform;
+  }
+
+  private class AvroIOShardedWrite<InputT> extends 
ShardControlledWrite<InputT> {
+    private final AvroIO.Write.Bound<InputT> initial;
+
+    private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) {
+      this.initial = initial;
+    }
+
+    @Override
+    int getNumShards() {
+      return initial.getNumShards();
+    }
+
+    @Override
+    PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int 
shardNum) {
+      String shardName =
+          IOChannelUtils.constructName(
+              initial.getFilenamePrefix(),
+              initial.getShardNameTemplate(),
+              initial.getFilenameSuffix(),
+              shardNum,
+              getNumShards());
+      return initial.withoutSharding().to(shardName).withSuffix("");
+    }
+
+    @Override
+    protected PTransform<PCollection<InputT>, PDone> delegate() {
+      return initial;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
new file mode 100644
index 0000000..3822d3b
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
+ * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
+ */
+final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
+  /*
+   * An evaluator for a Source is stateful, to ensure data is not read 
multiple times.
+   * Evaluators are cached here to ensure that the reader is not restarted if 
the evaluator is
+   * retriggered.
+   */
+  private final ConcurrentMap<EvaluatorKey, Queue<? extends 
BoundedReadEvaluator<?>>>
+      sourceEvaluators = new ConcurrentHashMap<>();
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext)
+      throws IOException {
+    return getTransformEvaluator((AppliedPTransform) application, 
evaluationContext);
+  }
+
+  private <OutputT> TransformEvaluator<?> getTransformEvaluator(
+      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform,
+      final InProcessEvaluationContext evaluationContext) {
+    BoundedReadEvaluator<?> evaluator =
+        getTransformEvaluatorQueue(transform, evaluationContext).poll();
+    if (evaluator == null) {
+      return EmptyTransformEvaluator.create(transform);
+    }
+    return evaluator;
+  }
+
+  /**
+   * Get the queue of {@link TransformEvaluator TransformEvaluators} that 
produce elements for the
+   * provided application of {@link Bounded Read.Bounded}, initializing it if 
required.
+   *
+   * <p>This method is thread-safe, and will only produce new evaluators if no 
other invocation has
+   * already done so.
+   */
+  @SuppressWarnings("unchecked")
+  private <OutputT> Queue<BoundedReadEvaluator<OutputT>> 
getTransformEvaluatorQueue(
+      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform,
+      final InProcessEvaluationContext evaluationContext) {
+    // Key by the application and the context the evaluation is occurring in 
(which call to
+    // Pipeline#run).
+    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
+    Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
+        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+    if (evaluatorQueue == null) {
+      evaluatorQueue = new ConcurrentLinkedQueue<>();
+      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+        // If no queue existed in the evaluators, add an evaluator to 
initialize the evaluator
+        // factory for this transform
+        BoundedSource<OutputT> source = transform.getTransform().getSource();
+        BoundedReadEvaluator<OutputT> evaluator =
+            new BoundedReadEvaluator<OutputT>(transform, evaluationContext, 
source);
+        evaluatorQueue.offer(evaluator);
+      } else {
+        // otherwise return the existing Queue that arrived before us
+        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) 
sourceEvaluators.get(key);
+      }
+    }
+    return evaluatorQueue;
+  }
+
+  /**
+   * A {@link BoundedReadEvaluator} produces elements from an underlying 
{@link BoundedSource},
+   * discarding all input elements. Within the call to {@link 
#finishBundle()}, the evaluator
+   * creates the {@link BoundedReader} and consumes all available input.
+   *
+   * <p>A {@link BoundedReadEvaluator} should only be created once per {@link 
BoundedSource}, and
+   * each evaluator should only be called once per evaluation of the pipeline. 
Otherwise, the source
+   * may produce duplicate elements.
+   */
+  private static class BoundedReadEvaluator<OutputT> implements 
TransformEvaluator<Object> {
+    private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> 
transform;
+    private final InProcessEvaluationContext evaluationContext;
+    /**
+     * The source being read from by this {@link BoundedReadEvaluator}. This 
may not be the same
+     * as the source derived from {@link #transform} due to splitting.
+     */
+    private BoundedSource<OutputT> source;
+
+    public BoundedReadEvaluator(
+        AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform,
+        InProcessEvaluationContext evaluationContext,
+        BoundedSource<OutputT> source) {
+      this.transform = transform;
+      this.evaluationContext = evaluationContext;
+      this.source = source;
+    }
+
+    @Override
+    public void processElement(WindowedValue<Object> element) {}
+
+    @Override
+    public InProcessTransformResult finishBundle() throws IOException {
+      try (final BoundedReader<OutputT> reader =
+              source.createReader(evaluationContext.getPipelineOptions());) {
+        boolean contentsRemaining = reader.start();
+        UncommittedBundle<OutputT> output =
+            evaluationContext.createRootBundle(transform.getOutput());
+        while (contentsRemaining) {
+          output.add(
+              WindowedValue.timestampedValueInGlobalWindow(
+                  reader.getCurrent(), reader.getCurrentTimestamp()));
+          contentsRemaining = reader.advance();
+        }
+        return StepTransformResult.withHold(transform, 
BoundedWindow.TIMESTAMP_MAX_VALUE)
+            .addOutput(output)
+            .build();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
new file mode 100644
index 0000000..34529e7
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import 
org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import 
org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * A factory that creates {@link UncommittedBundle UncommittedBundles}.
+ */
+public interface BundleFactory {
+  /**
+   * Create an {@link UncommittedBundle} from an empty input. Elements added 
to the bundle belong to
+   * the {@code output} {@link PCollection}.
+   */
+  public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output);
+
+  /**
+   * Create an {@link UncommittedBundle} from the specified input. Elements 
added to the bundle
+   * belong to the {@code output} {@link PCollection}.
+   */
+  public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, 
PCollection<T> output);
+
+  /**
+   * Create an {@link UncommittedBundle} with the specified keys at the 
specified step. For use by
+   * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements 
added to the bundle
+   * belong to the {@code output} {@link PCollection}.
+   */
+  public <T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, Object key, PCollection<T> output);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java
new file mode 100644
index 0000000..5b8e5fc
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ExecutorServiceFactory} that produces cached thread pools via
+ * {@link Executors#newCachedThreadPool()}.
+ */
+class CachedThreadPoolExecutorServiceFactory
+    implements DefaultValueFactory<ExecutorServiceFactory>, 
ExecutorServiceFactory {
+  private static final CachedThreadPoolExecutorServiceFactory INSTANCE =
+      new CachedThreadPoolExecutorServiceFactory();
+
+  @Override
+  public ExecutorServiceFactory create(PipelineOptions options) {
+    return INSTANCE;
+  }
+
+  @Override
+  public ExecutorService create() {
+    return Executors.newCachedThreadPool();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
new file mode 100644
index 0000000..88f8aab
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.joda.time.Instant;
+
+/**
+ * Access to the current time.
+ */
+public interface Clock {
+  /**
+   * Returns the current time as an {@link Instant}.
+   */
+  Instant now();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
new file mode 100644
index 0000000..d15e012
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A {@link InProcessTransformResult} that has been committed.
+ */
+@AutoValue
+abstract class CommittedResult {
+  /**
+   * Returns the {@link AppliedPTransform} that produced this result.
+   */
+  public abstract AppliedPTransform<?, ?, ?> getTransform();
+
+  /**
+   * Returns the outputs produced by the transform.
+   */
+  public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
+
+  public static CommittedResult create(
+      InProcessTransformResult original, Iterable<? extends 
CommittedBundle<?>> outputs) {
+    return new AutoValue_CommittedResult(original.getTransform(),
+        outputs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
new file mode 100644
index 0000000..7c2c068
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+
+/**
+ * A callback for completing a bundle of input.
+ */
+interface CompletionCallback {
+  /**
+   * Handle a successful result, returning the committed outputs of the result.
+   */
+  CommittedResult handleResult(
+      CommittedBundle<?> inputBundle, InProcessTransformResult result);
+
+  /**
+   * Handle a result that terminated abnormally due to the provided {@link 
Throwable}.
+   */
+  void handleThrowable(CommittedBundle<?> inputBundle, Throwable t);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
new file mode 100644
index 0000000..c790463
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.PValue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each 
{@link PValue} in the
+ * {@link Pipeline}. This is used to schedule consuming {@link PTransform 
PTransforms} to consume
+ * input after the upstream transform has produced and committed output.
+ */
+public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
+  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers 
= new HashMap<>();
+  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new 
ArrayList<>();
+  private Collection<PCollectionView<?>> views = new ArrayList<>();
+  private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
+  private Set<PValue> toFinalize = new HashSet<>();
+  private int numTransforms = 0;
+  private boolean finalized = false;
+
+  @Override
+  public void enterCompositeTransform(TransformTreeNode node) {
+    checkState(
+        !finalized,
+        "Attempting to traverse a pipeline (node %s) with a %s "
+            + "which has already visited a Pipeline and is finalized",
+        node.getFullName(),
+        ConsumerTrackingPipelineVisitor.class.getSimpleName());
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    checkState(
+        !finalized,
+        "Attempting to traverse a pipeline (node %s) with a %s which is 
already finalized",
+        node.getFullName(),
+        ConsumerTrackingPipelineVisitor.class.getSimpleName());
+    if (node.isRootNode()) {
+      finalized = true;
+    }
+  }
+
+  @Override
+  public void visitTransform(TransformTreeNode node) {
+    toFinalize.removeAll(node.getInput().expand());
+    AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
+    stepNames.put(appliedTransform, genStepName());
+    if (node.getInput().expand().isEmpty()) {
+      rootTransforms.add(appliedTransform);
+    } else {
+      for (PValue value : node.getInput().expand()) {
+        valueToConsumers.get(value).add(appliedTransform);
+      }
+    }
+  }
+
+  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode 
node) {
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
node.getTransform());
+    return application;
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    toFinalize.add(value);
+    for (PValue expandedValue : value.expand()) {
+      valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, 
?, ?>>());
+      if (expandedValue instanceof PCollectionView) {
+        views.add((PCollectionView<?>) expandedValue);
+      }
+      expandedValue.recordAsOutput(getAppliedTransform(producer));
+    }
+    value.recordAsOutput(getAppliedTransform(producer));
+  }
+
+  private String genStepName() {
+    return String.format("s%s", numTransforms++);
+  }
+
+
+  /**
+   * Returns a mapping of each fully-expanded {@link PValue} to each
+   * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in 
the collection
+   * returned from {@code getValueToCustomers().get(PValue)},
+   * {@code AppliedPTransform#getInput().expand()} will contain the argument 
{@link PValue}.
+   */
+  public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
getValueToConsumers() {
+    checkState(
+        finalized,
+        "Can't call getValueToConsumers before the Pipeline has been 
completely traversed");
+
+    return valueToConsumers;
+  }
+
+  /**
+   * Returns the mapping for each {@link AppliedPTransform} in the {@link 
Pipeline} to a unique step
+   * name.
+   */
+  public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() {
+    checkState(
+        finalized, "Can't call getStepNames before the Pipeline has been 
completely traversed");
+
+    return stepNames;
+  }
+
+  /**
+   * Returns the root transforms of the {@link Pipeline}. A root {@link 
AppliedPTransform} consumes
+   * a {@link PInput} where the {@link PInput#expand()} returns an empty 
collection.
+   */
+  public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() {
+    checkState(
+        finalized,
+        "Can't call getRootTransforms before the Pipeline has been completely 
traversed");
+
+    return rootTransforms;
+  }
+
+  /**
+   * Returns all of the {@link PCollectionView PCollectionViews} contained in 
the visited
+   * {@link Pipeline}.
+   */
+  public Collection<PCollectionView<?>> getViews() {
+    checkState(finalized, "Can't call getViews before the Pipeline has been 
completely traversed");
+
+    return views;
+  }
+
+  /**
+   * Returns all of the {@link PValue PValues} that have been produced but not 
consumed. These
+   * {@link PValue PValues} should be finalized by the {@link PipelineRunner} 
before the
+   * {@link Pipeline} is executed.
+   */
+  public Set<PValue> getUnfinalizedPValues() {
+    checkState(
+        finalized,
+        "Can't call getUnfinalizedPValues before the Pipeline has been 
completely traversed");
+
+    return toFinalize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
new file mode 100644
index 0000000..5379038
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A {@link TransformEvaluator} that ignores all input and produces no output. 
The result of
+ * invoking {@link #finishBundle()} on this evaluator is to return an
+ * {@link InProcessTransformResult} with no elements and a timestamp hold 
equal to
+ * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no 
elements, this hold
+ * will not affect the watermark.
+ */
+final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
+  public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> 
transform) {
+    return new EmptyTransformEvaluator<T>(transform);
+  }
+
+  private final AppliedPTransform<?, ?, ?> transform;
+
+  private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
+    this.transform = transform;
+  }
+
+  @Override
+  public void processElement(WindowedValue<T> element) throws Exception {}
+
+  @Override
+  public InProcessTransformResult finishBundle() throws Exception {
+    return StepTransformResult.withHold(transform, 
BoundedWindow.TIMESTAMP_MIN_VALUE)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
new file mode 100644
index 0000000..ccf4c2b
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Enforces that all elements in a {@link PCollection} can be encoded using 
that
+ * {@link PCollection PCollection's} {@link Coder}.
+ */
+class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static EncodabilityEnforcementFactory create() {
+    return new EncodabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new EncodabilityEnforcement<>(input);
+  }
+
+  private static class EncodabilityEnforcement<T> extends 
AbstractModelEnforcement<T> {
+    private Coder<T> coder;
+
+    public EncodabilityEnforcement(CommittedBundle<T> input) {
+      coder = input.getPCollection().getCoder();
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        T clone = CoderUtils.clone(coder, element.getValue());
+        if (coder.consistentWithEquals()) {
+          checkArgument(
+              
coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
+              "Coder %s of class %s does not maintain structural value 
equality"
+                  + " on input element %s",
+              coder,
+              coder.getClass().getSimpleName(),
+              element.getValue());
+        }
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
new file mode 100644
index 0000000..1c36751
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+import java.util.Objects;
+
+/**
+ * A (Transform, Pipeline Execution) key for stateful evaluators.
+ *
+ * Source evaluators are stateful to ensure data is not read multiple times. 
Evaluators are cached
+ * to ensure that the reader is not restarted if the evaluator is retriggered. 
An
+ * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be 
executed without sharing
+ * the same evaluators.
+ */
+final class EvaluatorKey {
+  private final AppliedPTransform<?, ?, ?> transform;
+  private final InProcessEvaluationContext context;
+
+  public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, 
InProcessEvaluationContext context) {
+    this.transform = transform;
+    this.context = context;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(transform, context);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof EvaluatorKey)) {
+      return false;
+    }
+    EvaluatorKey that = (EvaluatorKey) other;
+    return Objects.equals(this.transform, that.transform)
+        && Objects.equals(this.context, that.context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
new file mode 100644
index 0000000..91dc258
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A factory that creates {@link ExecutorService ExecutorServices}.
+ * {@link ExecutorService ExecutorServices} created by this factory should be 
independent of one
+ * another (e.g., if any executor is shut down the remaining executors should 
continue to process
+ * work).
+ */
+public interface ExecutorServiceFactory {
+  /**
+   * Create a new {@link ExecutorService}.
+   */
+  ExecutorService create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
new file mode 100644
index 0000000..18af363
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link InProcessExecutor} that uses an underlying {@link 
ExecutorService} and
+ * {@link InProcessEvaluationContext} to execute a {@link Pipeline}.
+ */
+final class ExecutorServiceParallelExecutor implements InProcessExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
+
+  private final ExecutorService executorService;
+
+  private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> 
valueToConsumers;
+  private final Set<PValue> keyedPValues;
+  private final TransformEvaluatorRegistry registry;
+  @SuppressWarnings("rawtypes")
+  private final Map<Class<? extends PTransform>, 
Collection<ModelEnforcementFactory>>
+      transformEnforcements;
+
+  private final InProcessEvaluationContext evaluationContext;
+
+  private final LoadingCache<StepAndKey, TransformExecutorService> 
executorServices;
+  private final ConcurrentMap<TransformExecutor<?>, Boolean> 
scheduledExecutors;
+
+  private final Queue<ExecutorUpdate> allUpdates;
+  private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
+
+  private final TransformExecutorService parallelExecutorService;
+  private final CompletionCallback defaultCompletionCallback;
+
+  private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
+
+  public static ExecutorServiceParallelExecutor create(
+      ExecutorService executorService,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Set<PValue> keyedPValues,
+      TransformEvaluatorRegistry registry,
+      @SuppressWarnings("rawtypes")
+      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
+      InProcessEvaluationContext context) {
+    return new ExecutorServiceParallelExecutor(
+        executorService, valueToConsumers, keyedPValues, registry, 
transformEnforcements, context);
+  }
+
+  private ExecutorServiceParallelExecutor(
+      ExecutorService executorService,
+      Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+      Set<PValue> keyedPValues,
+      TransformEvaluatorRegistry registry,
+      @SuppressWarnings("rawtypes")
+      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> 
transformEnforcements,
+      InProcessEvaluationContext context) {
+    this.executorService = executorService;
+    this.valueToConsumers = valueToConsumers;
+    this.keyedPValues = keyedPValues;
+    this.registry = registry;
+    this.transformEnforcements = transformEnforcements;
+    this.evaluationContext = context;
+
+    scheduledExecutors = new ConcurrentHashMap<>();
+    // Weak Values allows TransformExecutorServices that are no longer in use 
to be reclaimed.
+    // Executing TransformExecutorServices have a strong reference to their 
TransformExecutorService
+    // which stops the TransformExecutorServices from being prematurely 
garbage collected
+    executorServices =
+        
CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
+
+    this.allUpdates = new ConcurrentLinkedQueue<>();
+    this.visibleUpdates = new ArrayBlockingQueue<>(20);
+
+    parallelExecutorService =
+        TransformExecutorServices.parallel(executorService, 
scheduledExecutors);
+    defaultCompletionCallback = new DefaultCompletionCallback();
+  }
+
+  private CacheLoader<StepAndKey, TransformExecutorService>
+      serialTransformExecutorServiceCacheLoader() {
+    return new CacheLoader<StepAndKey, TransformExecutorService>() {
+      @Override
+      public TransformExecutorService load(StepAndKey stepAndKey) throws 
Exception {
+        return TransformExecutorServices.serial(executorService, 
scheduledExecutors);
+      }
+    };
+  }
+
+  @Override
+  public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
+    rootNodes = ImmutableList.copyOf(roots);
+    Runnable monitorRunnable = new MonitorRunnable();
+    executorService.submit(monitorRunnable);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void scheduleConsumption(
+      AppliedPTransform<?, ?, ?> consumer,
+      @Nullable CommittedBundle<?> bundle,
+      CompletionCallback onComplete) {
+    evaluateBundle(consumer, bundle, onComplete);
+  }
+
+  private <T> void evaluateBundle(
+      final AppliedPTransform<?, ?, ?> transform,
+      @Nullable final CommittedBundle<T> bundle,
+      final CompletionCallback onComplete) {
+    TransformExecutorService transformExecutor;
+
+    if (bundle != null && isKeyed(bundle.getPCollection())) {
+      final StepAndKey stepAndKey =
+          StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
+      // This executor will remain reachable until it has executed all 
scheduled transforms.
+      // The TransformExecutors keep a strong reference to the Executor, the 
ExecutorService keeps
+      // a reference to the scheduled TransformExecutor callable. Follow-up 
TransformExecutors
+      // (scheduled due to the completion of another TransformExecutor) are 
provided to the
+      // ExecutorService before the Earlier TransformExecutor callable 
completes.
+      transformExecutor = executorServices.getUnchecked(stepAndKey);
+    } else {
+      transformExecutor = parallelExecutorService;
+    }
+
+    Collection<ModelEnforcementFactory> enforcements =
+        MoreObjects.firstNonNull(
+            transformEnforcements.get(transform.getTransform().getClass()),
+            Collections.<ModelEnforcementFactory>emptyList());
+
+    TransformExecutor<T> callable =
+        TransformExecutor.create(
+            registry,
+            enforcements,
+            evaluationContext,
+            bundle,
+            transform,
+            onComplete,
+            transformExecutor);
+    transformExecutor.schedule(callable);
+  }
+
+  private boolean isKeyed(PValue pvalue) {
+    return keyedPValues.contains(pvalue);
+  }
+
+  private void scheduleConsumers(CommittedBundle<?> bundle) {
+    for (AppliedPTransform<?, ?, ?> consumer : 
valueToConsumers.get(bundle.getPCollection())) {
+      scheduleConsumption(consumer, bundle, defaultCompletionCallback);
+    }
+  }
+
+  @Override
+  public void awaitCompletion() throws Throwable {
+    VisibleExecutorUpdate update;
+    do {
+      update = visibleUpdates.take();
+      if (update.throwable.isPresent()) {
+        throw update.throwable.get();
+      }
+    } while (!update.isDone());
+    executorService.shutdown();
+  }
+
+  /**
+   * The default {@link CompletionCallback}. The default completion callback 
is used to complete
+   * transform evaluations that are triggered due to the arrival of elements 
from an upstream
+   * transform, or for a source transform.
+   */
+  private class DefaultCompletionCallback implements CompletionCallback {
+    @Override
+    public CommittedResult handleResult(
+        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      CommittedResult committedResult =
+          evaluationContext.handleResult(inputBundle, 
Collections.<TimerData>emptyList(), result);
+      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
+        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+      }
+      return committedResult;
+    }
+
+    @Override
+    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+    }
+  }
+
+  /**
+   * A {@link CompletionCallback} where the completed bundle was produced to 
deliver some collection
+   * of {@link TimerData timers}. When the evaluator completes successfully, 
reports all of the
+   * timers used to create the input to the {@link InProcessEvaluationContext 
evaluation context}
+   * as part of the result.
+   */
+  private class TimerCompletionCallback implements CompletionCallback {
+    private final Iterable<TimerData> timers;
+
+    private TimerCompletionCallback(Iterable<TimerData> timers) {
+      this.timers = timers;
+    }
+
+    @Override
+    public CommittedResult handleResult(
+        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      CommittedResult committedResult =
+          evaluationContext.handleResult(inputBundle, timers, result);
+      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
+        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+      }
+      return committedResult;
+    }
+
+    @Override
+    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+    }
+  }
+
+  /**
+   * An internal status update on the state of the executor.
+   *
+   * Used to signal when the executor should be shut down (due to an 
exception).
+   */
+  private static class ExecutorUpdate {
+    private final Optional<? extends CommittedBundle<?>> bundle;
+    private final Optional<? extends Throwable> throwable;
+
+    public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
+      return new ExecutorUpdate(bundle, null);
+    }
+
+    public static ExecutorUpdate fromThrowable(Throwable t) {
+      return new ExecutorUpdate(null, t);
+    }
+
+    private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable 
throwable) {
+      this.bundle = Optional.fromNullable(producedBundle);
+      this.throwable = Optional.fromNullable(throwable);
+    }
+
+    public Optional<? extends CommittedBundle<?>> getBundle() {
+      return bundle;
+    }
+
+    public Optional<? extends Throwable> getException() {
+      return throwable;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(ExecutorUpdate.class)
+          .add("bundle", bundle)
+          .add("exception", throwable)
+          .toString();
+    }
+  }
+
+  /**
+   * An update of interest to the user. Used in {@link #awaitCompletion} to 
decide whether to
+   * return normally or throw an exception.
+   */
+  private static class VisibleExecutorUpdate {
+    private final Optional<? extends Throwable> throwable;
+    private final boolean done;
+
+    public static VisibleExecutorUpdate fromThrowable(Throwable e) {
+      return new VisibleExecutorUpdate(false, e);
+    }
+
+    public static VisibleExecutorUpdate finished() {
+      return new VisibleExecutorUpdate(true, null);
+    }
+
+    private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) 
{
+      this.throwable = Optional.fromNullable(exception);
+      this.done = done;
+    }
+
+    public boolean isDone() {
+      return done;
+    }
+  }
+
+  private class MonitorRunnable implements Runnable {
+    private final String runnableName =
+        String.format(
+            "%s$%s-monitor",
+            evaluationContext.getPipelineOptions().getAppName(),
+            ExecutorServiceParallelExecutor.class.getSimpleName());
+
+    @Override
+    public void run() {
+      String oldName = Thread.currentThread().getName();
+      Thread.currentThread().setName(runnableName);
+      try {
+        ExecutorUpdate update = allUpdates.poll();
+        // pull all of the pending work off of the queue
+        while (update != null) {
+          LOG.debug("Executor Update: {}", update);
+          if (update.getBundle().isPresent()) {
+            scheduleConsumers(update.getBundle().get());
+          } else if (update.getException().isPresent()) {
+            
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
+          }
+          update = allUpdates.poll();
+        }
+        boolean timersFired = fireTimers();
+        addWorkIfNecessary(timersFired);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.error("Monitor died due to being interrupted");
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
+          visibleUpdates.poll();
+        }
+      } catch (Throwable t) {
+        LOG.error("Monitor thread died due to throwable", t);
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) {
+          visibleUpdates.poll();
+        }
+      } finally {
+        if (!shouldShutdown()) {
+          // The monitor thread should always be scheduled; but we only need 
to be scheduled once
+          executorService.submit(this);
+        }
+        Thread.currentThread().setName(oldName);
+      }
+    }
+
+    /**
+     * Fires any available timers. Returns true if at least one timer was 
fired.
+     */
+    private boolean fireTimers() throws Exception {
+      try {
+        boolean firedTimers = false;
+        for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> 
transformTimers :
+            evaluationContext.extractFiredTimers().entrySet()) {
+          AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
+          for (Map.Entry<Object, FiredTimers> keyTimers : 
transformTimers.getValue().entrySet()) {
+            for (TimeDomain domain : TimeDomain.values()) {
+              Collection<TimerData> delivery = 
keyTimers.getValue().getTimers(domain);
+              if (delivery.isEmpty()) {
+                continue;
+              }
+              KeyedWorkItem<Object, Object> work =
+                  KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
+              @SuppressWarnings({"unchecked", "rawtypes"})
+              CommittedBundle<?> bundle =
+                  evaluationContext
+                      .createKeyedBundle(
+                          null, keyTimers.getKey(), (PCollection) 
transform.getInput())
+                      .add(WindowedValue.valueInEmptyWindows(work))
+                      .commit(Instant.now());
+              scheduleConsumption(transform, bundle, new 
TimerCompletionCallback(delivery));
+              firedTimers = true;
+            }
+          }
+        }
+        return firedTimers;
+      } catch (Exception e) {
+        LOG.error("Internal Error while delivering timers", e);
+        throw e;
+      }
+    }
+
+    private boolean shouldShutdown() {
+      if (evaluationContext.isDone()) {
+        LOG.debug("Pipeline is finished. Shutting down. {}");
+        while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
+          visibleUpdates.poll();
+        }
+        executorService.shutdown();
+        return true;
+      }
+      return false;
+    }
+
+    /**
+     * If all active {@link TransformExecutor TransformExecutors} are in a 
blocked state,
+     * add more work from root nodes that may have additional work. This 
ensures that if a pipeline
+     * has elements available from the root nodes it will add those elements 
when necessary.
+     */
+    private void addWorkIfNecessary(boolean firedTimers) {
+      // If any timers have fired, they will add more work; We don't need to 
add more
+      if (firedTimers) {
+        return;
+      }
+      for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
+        if (!isExecutorBlocked(executor)) {
+          // We have at least one executor that can proceed without adding 
additional work
+          return;
+        }
+      }
+      // All current TransformExecutors are blocked; add more work from the 
roots.
+      for (AppliedPTransform<?, ?, ?> root : rootNodes) {
+        if (!evaluationContext.isDone(root)) {
+          scheduleConsumption(root, null, defaultCompletionCallback);
+        }
+      }
+    }
+
+    /**
+     * Return true if the provided executor might make more progress if no 
action is taken.
+     *
+     * <p>May return false even if all executor threads are currently blocked 
or cleaning up, as
+     * these can cause more work to be scheduled. If this does not occur, 
after these calls
+     * terminate, future calls will return true if all executors are waiting.
+     */
+    private boolean isExecutorBlocked(TransformExecutor<?> executor) {
+      Thread thread = executor.getThread();
+      if (thread == null) {
+        return false;
+      }
+      switch (thread.getState()) {
+        case TERMINATED:
+          throw new IllegalStateException(String.format(
+              "Unexpectedly encountered a Terminated TransformExecutor %s", 
executor));
+        case WAITING:
+        case TIMED_WAITING:
+          // The thread is waiting for some external input. Adding more work 
may cause the thread
+          // to stop waiting (e.g. the thread is waiting on an unbounded side 
input)
+          return true;
+        case BLOCKED:
+          // The executor is blocked on acquisition of a java monitor. This 
usually means it is
+          // making a call to the EvaluationContext, but not a model-blocking 
call - and will
+          // eventually complete, at which point we may reevaluate.
+        default:
+          // NEW and RUNNABLE threads can make progress
+          return false;
+      }
+    }
+  }
+}

Reply via email to