Move InProcessRunner to its own module
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e13cacb8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e13cacb8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e13cacb8 Branch: refs/heads/master Commit: e13cacb81c9c9718a45bfb7aefd839dcdfd442f6 Parents: bba4c64 Author: Kenneth Knowles <k...@google.com> Authored: Wed Apr 27 15:01:48 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Apr 29 14:28:45 2016 -0700 ---------------------------------------------------------------------- .travis.yml | 1 + runners/direct-java/pom.xml | 400 ++++++ .../direct/AbstractModelEnforcement.java | 38 + .../direct/AvroIOShardedWriteFactory.java | 76 + .../direct/BoundedReadEvaluatorFactory.java | 155 ++ .../beam/runners/direct/BundleFactory.java | 49 + .../CachedThreadPoolExecutorServiceFactory.java | 44 + .../org/apache/beam/runners/direct/Clock.java | 30 + .../beam/runners/direct/CommittedResult.java | 46 + .../beam/runners/direct/CompletionCallback.java | 36 + .../direct/ConsumerTrackingPipelineVisitor.java | 173 +++ .../runners/direct/EmptyTransformEvaluator.java | 50 + .../direct/EncodabilityEnforcementFactory.java | 70 + .../beam/runners/direct/EvaluatorKey.java | 55 + .../runners/direct/ExecutorServiceFactory.java | 33 + .../direct/ExecutorServiceParallelExecutor.java | 478 +++++++ .../runners/direct/FlattenEvaluatorFactory.java | 85 ++ .../runners/direct/ForwardingPTransform.java | 62 + .../direct/GroupByKeyEvaluatorFactory.java | 274 ++++ .../ImmutabilityCheckingBundleFactory.java | 131 ++ .../direct/ImmutabilityEnforcementFactory.java | 103 ++ .../direct/InMemoryWatermarkManager.java | 1327 ++++++++++++++++++ .../runners/direct/InProcessBundleFactory.java | 162 +++ .../direct/InProcessBundleOutputManager.java | 51 + .../direct/InProcessEvaluationContext.java | 425 ++++++ .../direct/InProcessExecutionContext.java | 105 ++ .../beam/runners/direct/InProcessExecutor.java | 48 + .../direct/InProcessPipelineOptions.java | 101 ++ .../runners/direct/InProcessPipelineRunner.java | 370 +++++ .../beam/runners/direct/InProcessRegistrar.java | 55 + .../direct/InProcessSideInputContainer.java | 271 ++++ .../runners/direct/InProcessTimerInternals.java | 84 ++ .../direct/InProcessTransformResult.java | 77 + .../direct/KeyedPValueTrackingVisitor.java | 96 ++ .../beam/runners/direct/ModelEnforcement.java | 63 + .../runners/direct/ModelEnforcementFactory.java | 30 + .../beam/runners/direct/NanosOffsetClock.java | 59 + .../direct/PTransformOverrideFactory.java | 33 + .../runners/direct/ParDoInProcessEvaluator.java | 173 +++ .../direct/ParDoMultiEvaluatorFactory.java | 64 + .../direct/ParDoSingleEvaluatorFactory.java | 63 + .../direct/PassthroughTransformEvaluator.java | 49 + .../runners/direct/ShardControlledWrite.java | 81 ++ .../apache/beam/runners/direct/StepAndKey.java | 71 + .../runners/direct/StepTransformResult.java | 165 +++ .../direct/TextIOShardedWriteFactory.java | 78 + .../beam/runners/direct/TransformEvaluator.java | 46 + .../direct/TransformEvaluatorFactory.java | 44 + .../direct/TransformEvaluatorRegistry.java | 77 + .../beam/runners/direct/TransformExecutor.java | 176 +++ .../direct/TransformExecutorService.java | 35 + .../direct/TransformExecutorServices.java | 154 ++ .../direct/UnboundedReadEvaluatorFactory.java | 177 +++ .../runners/direct/ViewEvaluatorFactory.java | 145 ++ .../direct/WatermarkCallbackExecutor.java | 146 ++ .../runners/direct/WindowEvaluatorFactory.java | 131 ++ .../direct/AvroIOShardedWriteFactoryTest.java | 112 ++ .../direct/BoundedReadEvaluatorFactoryTest.java | 290 ++++ .../runners/direct/CommittedResultTest.java | 77 + .../ConsumerTrackingPipelineVisitorTest.java | 272 ++++ .../EncodabilityEnforcementFactoryTest.java | 257 ++++ .../direct/FlattenEvaluatorFactoryTest.java | 141 ++ .../direct/ForwardingPTransformTest.java | 112 ++ .../direct/GroupByKeyEvaluatorFactoryTest.java | 183 +++ .../ImmutabilityCheckingBundleFactoryTest.java | 220 +++ .../ImmutabilityEnforcementFactoryTest.java | 128 ++ .../direct/InMemoryWatermarkManagerTest.java | 1168 +++++++++++++++ .../direct/InProcessBundleFactoryTest.java | 223 +++ .../direct/InProcessEvaluationContextTest.java | 526 +++++++ .../direct/InProcessPipelineRegistrarTest.java | 74 + .../direct/InProcessPipelineRunnerTest.java | 78 + .../direct/InProcessSideInputContainerTest.java | 496 +++++++ .../direct/InProcessTimerInternalsTest.java | 133 ++ .../direct/KeyedPValueTrackingVisitorTest.java | 192 +++ .../apache/beam/runners/direct/MockClock.java | 62 + .../direct/ParDoMultiEvaluatorFactoryTest.java | 431 ++++++ .../direct/ParDoSingleEvaluatorFactoryTest.java | 324 +++++ .../direct/TextIOShardedWriteFactoryTest.java | 112 ++ .../direct/TransformExecutorServicesTest.java | 136 ++ .../runners/direct/TransformExecutorTest.java | 538 +++++++ .../UnboundedReadEvaluatorFactoryTest.java | 334 +++++ .../direct/ViewEvaluatorFactoryTest.java | 101 ++ .../direct/WatermarkCallbackExecutorTest.java | 128 ++ .../direct/WindowEvaluatorFactoryTest.java | 222 +++ runners/pom.xml | 1 + .../inprocess/AbstractModelEnforcement.java | 38 - .../inprocess/AvroIOShardedWriteFactory.java | 76 - .../inprocess/BoundedReadEvaluatorFactory.java | 155 -- .../sdk/runners/inprocess/BundleFactory.java | 49 - .../CachedThreadPoolExecutorServiceFactory.java | 44 - .../beam/sdk/runners/inprocess/Clock.java | 30 - .../sdk/runners/inprocess/CommittedResult.java | 46 - .../runners/inprocess/CompletionCallback.java | 36 - .../ConsumerTrackingPipelineVisitor.java | 173 --- .../inprocess/EmptyTransformEvaluator.java | 50 - .../EncodabilityEnforcementFactory.java | 70 - .../sdk/runners/inprocess/EvaluatorKey.java | 55 - .../inprocess/ExecutorServiceFactory.java | 33 - .../ExecutorServiceParallelExecutor.java | 478 ------- .../inprocess/FlattenEvaluatorFactory.java | 85 -- .../runners/inprocess/ForwardingPTransform.java | 62 - .../inprocess/GroupByKeyEvaluatorFactory.java | 274 ---- .../ImmutabilityCheckingBundleFactory.java | 131 -- .../ImmutabilityEnforcementFactory.java | 103 -- .../inprocess/InMemoryWatermarkManager.java | 1327 ------------------ .../inprocess/InProcessBundleFactory.java | 162 --- .../inprocess/InProcessBundleOutputManager.java | 51 - .../inprocess/InProcessEvaluationContext.java | 425 ------ .../inprocess/InProcessExecutionContext.java | 105 -- .../runners/inprocess/InProcessExecutor.java | 48 - .../inprocess/InProcessPipelineOptions.java | 101 -- .../inprocess/InProcessPipelineRunner.java | 370 ----- .../runners/inprocess/InProcessRegistrar.java | 55 - .../inprocess/InProcessSideInputContainer.java | 271 ---- .../inprocess/InProcessTimerInternals.java | 84 -- .../inprocess/InProcessTransformResult.java | 77 - .../inprocess/KeyedPValueTrackingVisitor.java | 96 -- .../sdk/runners/inprocess/ModelEnforcement.java | 63 - .../inprocess/ModelEnforcementFactory.java | 30 - .../sdk/runners/inprocess/NanosOffsetClock.java | 59 - .../inprocess/PTransformOverrideFactory.java | 33 - .../inprocess/ParDoInProcessEvaluator.java | 173 --- .../inprocess/ParDoMultiEvaluatorFactory.java | 63 - .../inprocess/ParDoSingleEvaluatorFactory.java | 63 - .../PassthroughTransformEvaluator.java | 49 - .../runners/inprocess/ShardControlledWrite.java | 81 -- .../beam/sdk/runners/inprocess/StepAndKey.java | 71 - .../runners/inprocess/StepTransformResult.java | 165 --- .../inprocess/TextIOShardedWriteFactory.java | 78 - .../runners/inprocess/TransformEvaluator.java | 46 - .../inprocess/TransformEvaluatorFactory.java | 44 - .../inprocess/TransformEvaluatorRegistry.java | 77 - .../runners/inprocess/TransformExecutor.java | 176 --- .../inprocess/TransformExecutorService.java | 35 - .../inprocess/TransformExecutorServices.java | 154 -- .../UnboundedReadEvaluatorFactory.java | 177 --- .../runners/inprocess/ViewEvaluatorFactory.java | 145 -- .../inprocess/WatermarkCallbackExecutor.java | 146 -- .../inprocess/WindowEvaluatorFactory.java | 131 -- .../AvroIOShardedWriteFactoryTest.java | 112 -- .../BoundedReadEvaluatorFactoryTest.java | 290 ---- .../runners/inprocess/CommittedResultTest.java | 77 - .../ConsumerTrackingPipelineVisitorTest.java | 272 ---- .../EncodabilityEnforcementFactoryTest.java | 257 ---- .../inprocess/FlattenEvaluatorFactoryTest.java | 141 -- .../inprocess/ForwardingPTransformTest.java | 112 -- .../GroupByKeyEvaluatorFactoryTest.java | 183 --- .../ImmutabilityCheckingBundleFactoryTest.java | 220 --- .../ImmutabilityEnforcementFactoryTest.java | 128 -- .../inprocess/InMemoryWatermarkManagerTest.java | 1168 --------------- .../inprocess/InProcessBundleFactoryTest.java | 223 --- .../InProcessEvaluationContextTest.java | 526 ------- .../InProcessPipelineRegistrarTest.java | 74 - .../inprocess/InProcessPipelineRunnerTest.java | 78 - .../InProcessSideInputContainerTest.java | 496 ------- .../inprocess/InProcessTimerInternalsTest.java | 133 -- .../KeyedPValueTrackingVisitorTest.java | 192 --- .../beam/sdk/runners/inprocess/MockClock.java | 62 - .../ParDoMultiEvaluatorFactoryTest.java | 431 ------ .../ParDoSingleEvaluatorFactoryTest.java | 324 ----- .../TextIOShardedWriteFactoryTest.java | 112 -- .../TransformExecutorServicesTest.java | 136 -- .../inprocess/TransformExecutorTest.java | 538 ------- .../UnboundedReadEvaluatorFactoryTest.java | 334 ----- .../inprocess/ViewEvaluatorFactoryTest.java | 101 -- .../WatermarkCallbackExecutorTest.java | 128 -- .../inprocess/WindowEvaluatorFactoryTest.java | 222 --- sdks/java/pom.xml | 1 - 168 files changed, 14692 insertions(+), 14290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index f1d9e3b..8aad36e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,6 +25,7 @@ matrix: env: CUSTOM_JDK="openjdk7" MAVEN_OVERRIDE="-DforkCount=0" before_install: + - echo "MAVEN_OPTS='-Xmx2048m -XX:MaxPermSize=512m'" > ~/.mavenrc - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml new file mode 100644 index 0000000..12ba329 --- /dev/null +++ b/runners/direct-java/pom.xml @@ -0,0 +1,400 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>runners-parent</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>direct-runner</artifactId> + + <name>Apache Beam :: Runners :: Direct</name> + + <packaging>jar</packaging> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <goals><goal>analyze-only</goal></goals> + <configuration> + <failOnWarning>true</failOnWarning> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>default-jar</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + <execution> + <id>default-test-jar</id> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Source plugin for generating source and test-source JARs. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <id>attach-sources</id> + <phase>verify</phase> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + <execution> + <id>attach-test-sources</id> + <phase>verify</phase> + <goals> + <goal>test-jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <executions> + <!-- For now, disables integration tests from the SDK as the runner is not ready. --> + <execution> + <id>runnable-on-service-tests</id> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <windowtitle>Apache Beam Direct Runner ${project.version}</windowtitle> + <doctitle>Apache Beam Direct Runner, version ${project.version}</doctitle> + + <subpackages>org.apache.beam.runners.direct</subpackages> + <use>false</use> + <quiet>true</quiet> + <bottom><![CDATA[<br>]]></bottom> + + <offlineLinks> + <offlineLink> + <url>https://developers.google.com/api-client-library/java/google-api-java-client/reference/1.20.0/</url> + <location>${basedir}/../../sdks/java/javadoc/apiclient-docs</location> + </offlineLink> + <offlineLink> + <url>http://avro.apache.org/docs/1.7.7/api/java/</url> + <location>${basedir}/../../sdks/java/javadoc/avro-docs</location> + </offlineLink> + <offlineLink> + <url>https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/</url> + <location>${basedir}/../../sdks/java/javadoc/bq-docs</location> + </offlineLink> + <offlineLink> + <url>https://cloud.google.com/datastore/docs/apis/javadoc/</url> + <location>${basedir}/../../sdks/java/javadoc/datastore-docs</location> + </offlineLink> + <offlineLink> + <url>http://docs.guava-libraries.googlecode.com/git-history/release19/javadoc/</url> + <location>${basedir}/../../sdks/java/javadoc/guava-docs</location> + </offlineLink> + <offlineLink> + <url>http://hamcrest.org/JavaHamcrest/javadoc/1.3/</url> + <location>${basedir}/../../sdks/java/javadoc/hamcrest-docs</location> + </offlineLink> + <offlineLink> + <url>http://fasterxml.github.io/jackson-annotations/javadoc/2.7/</url> + <location>${basedir}/../../sdks/java/javadoc/jackson-annotations-docs</location> + </offlineLink> + <offlineLink> + <url>http://fasterxml.github.io/jackson-databind/javadoc/2.7/</url> + <location>${basedir}/../../sdks/java/javadoc/jackson-databind-docs</location> + </offlineLink> + <offlineLink> + <url>http://www.joda.org/joda-time/apidocs</url> + <location>${basedir}/../../sdks/java/javadoc/joda-docs</location> + </offlineLink> + <offlineLink> + <url>http://junit.sourceforge.net/javadoc/</url> + <location>${basedir}/../../sdks/java/javadoc/junit-docs</location> + </offlineLink> + <offlineLink> + <url>https://developers.google.com/api-client-library/java/google-oauth-java-client/reference/1.20.0/</url> + <location>${basedir}/../../sdks/java/javadoc/oauth-docs</location> + </offlineLink> + </offlineLinks> + </configuration> + <executions> + <execution> + <goals> + <goal>jar</goal> + </goals> + <phase>package</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.1</version> + <executions> + <!-- In the first phase, we pick dependencies and relocate them. --> + <execution> + <id>bundle-and-repackage</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>true</shadeTestJar> + <artifactSet> + <includes> + <include>com.google.guava:guava</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <!-- TODO: Once ready, change the following pattern to 'com' + only, exclude 'org.apache.beam.**', and remove + the second relocation. --> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.beam.runners.direct.repackaged.com.google.common</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + + <!-- In the second phase, we pick remaining dependencies and bundle + them without repackaging. --> + <execution> + <id>bundle-rest-without-repackaging</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>true</shadeTestJar> + <finalName>${project.artifactId}-bundled-${project.version}</finalName> + <artifactSet> + <excludes> + <exclude>com.google.guava:guava</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-all</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client</artifactId> + <version>${google-clients.version}</version> + <exclusions> + <!-- Exclude an old version of guava that is being pulled + in by a transitive dependency of google-api-client --> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava-jdk5</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.http-client</groupId> + <artifactId>google-http-client-protobuf</artifactId> + <version>${google-clients.version}</version> + <exclusions> + <!-- Exclude an old version of guava that is being pulled + in by a transitive dependency of google-api-client --> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava-jdk5</artifactId> + </exclusion> + </exclusions> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <!-- If updating version, please update the javadoc offlineLink --> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava-testlib</artifactId> + <version>${guava.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>${joda.version}</version> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>${jsr305.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + + <!-- build dependencies --> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <version>1.0-rc2</version> + <optional>true</optional> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <version>1.1</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>${hamcrest.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <version>${slf4j.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-all</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java new file mode 100644 index 0000000..948beb6 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * An abstract {@link ModelEnforcement} that provides default empty implementations for each method. + */ +abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> { + @Override + public void beforeElement(WindowedValue<T> element) {} + + @Override + public void afterElement(WindowedValue<T> element) {} + + @Override + public void afterFinish( + CommittedBundle<T> input, + InProcessTransformResult result, + Iterable<? extends CommittedBundle<?>> outputs) {} +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java new file mode 100644 index 0000000..7422f27 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +class AvroIOShardedWriteFactory implements PTransformOverrideFactory { + @Override + public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( + PTransform<InputT, OutputT> transform) { + if (transform instanceof AvroIO.Write.Bound) { + @SuppressWarnings("unchecked") + AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform; + if (originalWrite.getNumShards() > 1 + || (originalWrite.getNumShards() == 1 + && !"".equals(originalWrite.getShardNameTemplate()))) { + @SuppressWarnings("unchecked") + PTransform<InputT, OutputT> override = + (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite); + return override; + } + } + return transform; + } + + private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> { + private final AvroIO.Write.Bound<InputT> initial; + + private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) { + this.initial = initial; + } + + @Override + int getNumShards() { + return initial.getNumShards(); + } + + @Override + PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) { + String shardName = + IOChannelUtils.constructName( + initial.getFilenamePrefix(), + initial.getShardNameTemplate(), + initial.getFilenameSuffix(), + shardNum, + getNumShards()); + return initial.withoutSharding().to(shardName).withSuffix(""); + } + + @Override + protected PTransform<PCollection<InputT>, PDone> delegate() { + return initial; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java new file mode 100644 index 0000000..3822d3b --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} + * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. + */ +final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { + /* + * An evaluator for a Source is stateful, to ensure data is not read multiple times. + * Evaluators are cached here to ensure that the reader is not restarted if the evaluator is + * retriggered. + */ + private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>> + sourceEvaluators = new ConcurrentHashMap<>(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + AppliedPTransform<?, ?, ?> application, + @Nullable CommittedBundle<?> inputBundle, + InProcessEvaluationContext evaluationContext) + throws IOException { + return getTransformEvaluator((AppliedPTransform) application, evaluationContext); + } + + private <OutputT> TransformEvaluator<?> getTransformEvaluator( + final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, + final InProcessEvaluationContext evaluationContext) { + BoundedReadEvaluator<?> evaluator = + getTransformEvaluatorQueue(transform, evaluationContext).poll(); + if (evaluator == null) { + return EmptyTransformEvaluator.create(transform); + } + return evaluator; + } + + /** + * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the + * provided application of {@link Bounded Read.Bounded}, initializing it if required. + * + * <p>This method is thread-safe, and will only produce new evaluators if no other invocation has + * already done so. + */ + @SuppressWarnings("unchecked") + private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue( + final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, + final InProcessEvaluationContext evaluationContext) { + // Key by the application and the context the evaluation is occurring in (which call to + // Pipeline#run). + EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); + Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue = + (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + if (evaluatorQueue == null) { + evaluatorQueue = new ConcurrentLinkedQueue<>(); + if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { + // If no queue existed in the evaluators, add an evaluator to initialize the evaluator + // factory for this transform + BoundedSource<OutputT> source = transform.getTransform().getSource(); + BoundedReadEvaluator<OutputT> evaluator = + new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source); + evaluatorQueue.offer(evaluator); + } else { + // otherwise return the existing Queue that arrived before us + evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key); + } + } + return evaluatorQueue; + } + + /** + * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource}, + * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator + * creates the {@link BoundedReader} and consumes all available input. + * + * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and + * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source + * may produce duplicate elements. + */ + private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object> { + private final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform; + private final InProcessEvaluationContext evaluationContext; + /** + * The source being read from by this {@link BoundedReadEvaluator}. This may not be the same + * as the source derived from {@link #transform} due to splitting. + */ + private BoundedSource<OutputT> source; + + public BoundedReadEvaluator( + AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>> transform, + InProcessEvaluationContext evaluationContext, + BoundedSource<OutputT> source) { + this.transform = transform; + this.evaluationContext = evaluationContext; + this.source = source; + } + + @Override + public void processElement(WindowedValue<Object> element) {} + + @Override + public InProcessTransformResult finishBundle() throws IOException { + try (final BoundedReader<OutputT> reader = + source.createReader(evaluationContext.getPipelineOptions());) { + boolean contentsRemaining = reader.start(); + UncommittedBundle<OutputT> output = + evaluationContext.createRootBundle(transform.getOutput()); + while (contentsRemaining) { + output.add( + WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp())); + contentsRemaining = reader.advance(); + } + return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MAX_VALUE) + .addOutput(output) + .build(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java new file mode 100644 index 0000000..34529e7 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; + +/** + * A factory that creates {@link UncommittedBundle UncommittedBundles}. + */ +public interface BundleFactory { + /** + * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to + * the {@code output} {@link PCollection}. + */ + public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output); + + /** + * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle + * belong to the {@code output} {@link PCollection}. + */ + public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output); + + /** + * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle + * belong to the {@code output} {@link PCollection}. + */ + public <T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, Object key, PCollection<T> output); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java new file mode 100644 index 0000000..5b8e5fc --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CachedThreadPoolExecutorServiceFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A {@link ExecutorServiceFactory} that produces cached thread pools via + * {@link Executors#newCachedThreadPool()}. + */ +class CachedThreadPoolExecutorServiceFactory + implements DefaultValueFactory<ExecutorServiceFactory>, ExecutorServiceFactory { + private static final CachedThreadPoolExecutorServiceFactory INSTANCE = + new CachedThreadPoolExecutorServiceFactory(); + + @Override + public ExecutorServiceFactory create(PipelineOptions options) { + return INSTANCE; + } + + @Override + public ExecutorService create() { + return Executors.newCachedThreadPool(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java new file mode 100644 index 0000000..88f8aab --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/Clock.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.joda.time.Instant; + +/** + * Access to the current time. + */ +public interface Clock { + /** + * Returns the current time as an {@link Instant}. + */ + Instant now(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java new file mode 100644 index 0000000..d15e012 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import com.google.auto.value.AutoValue; + +/** + * A {@link InProcessTransformResult} that has been committed. + */ +@AutoValue +abstract class CommittedResult { + /** + * Returns the {@link AppliedPTransform} that produced this result. + */ + public abstract AppliedPTransform<?, ?, ?> getTransform(); + + /** + * Returns the outputs produced by the transform. + */ + public abstract Iterable<? extends CommittedBundle<?>> getOutputs(); + + public static CommittedResult create( + InProcessTransformResult original, Iterable<? extends CommittedBundle<?>> outputs) { + return new AutoValue_CommittedResult(original.getTransform(), + outputs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java new file mode 100644 index 0000000..7c2c068 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; + +/** + * A callback for completing a bundle of input. + */ +interface CompletionCallback { + /** + * Handle a successful result, returning the committed outputs of the result. + */ + CommittedResult handleResult( + CommittedBundle<?> inputBundle, InProcessTransformResult result); + + /** + * Handle a result that terminated abnormally due to the provided {@link Throwable}. + */ + void handleThrowable(CommittedBundle<?> inputBundle, Throwable t); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java new file mode 100644 index 0000000..c790463 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.PValue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the + * {@link Pipeline}. This is used to schedule consuming {@link PTransform PTransforms} to consume + * input after the upstream transform has produced and committed output. + */ +public class ConsumerTrackingPipelineVisitor implements PipelineVisitor { + private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>(); + private Collection<AppliedPTransform<?, ?, ?>> rootTransforms = new ArrayList<>(); + private Collection<PCollectionView<?>> views = new ArrayList<>(); + private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>(); + private Set<PValue> toFinalize = new HashSet<>(); + private int numTransforms = 0; + private boolean finalized = false; + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + checkState( + !finalized, + "Attempting to traverse a pipeline (node %s) with a %s " + + "which has already visited a Pipeline and is finalized", + node.getFullName(), + ConsumerTrackingPipelineVisitor.class.getSimpleName()); + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + checkState( + !finalized, + "Attempting to traverse a pipeline (node %s) with a %s which is already finalized", + node.getFullName(), + ConsumerTrackingPipelineVisitor.class.getSimpleName()); + if (node.isRootNode()) { + finalized = true; + } + } + + @Override + public void visitTransform(TransformTreeNode node) { + toFinalize.removeAll(node.getInput().expand()); + AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node); + stepNames.put(appliedTransform, genStepName()); + if (node.getInput().expand().isEmpty()) { + rootTransforms.add(appliedTransform); + } else { + for (PValue value : node.getInput().expand()) { + valueToConsumers.get(value).add(appliedTransform); + } + } + } + + private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode node) { + @SuppressWarnings({"rawtypes", "unchecked"}) + AppliedPTransform<?, ?, ?> application = AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) node.getTransform()); + return application; + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + toFinalize.add(value); + for (PValue expandedValue : value.expand()) { + valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, ?, ?>>()); + if (expandedValue instanceof PCollectionView) { + views.add((PCollectionView<?>) expandedValue); + } + expandedValue.recordAsOutput(getAppliedTransform(producer)); + } + value.recordAsOutput(getAppliedTransform(producer)); + } + + private String genStepName() { + return String.format("s%s", numTransforms++); + } + + + /** + * Returns a mapping of each fully-expanded {@link PValue} to each + * {@link AppliedPTransform} that consumes it. For each AppliedPTransform in the collection + * returned from {@code getValueToCustomers().get(PValue)}, + * {@code AppliedPTransform#getInput().expand()} will contain the argument {@link PValue}. + */ + public Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> getValueToConsumers() { + checkState( + finalized, + "Can't call getValueToConsumers before the Pipeline has been completely traversed"); + + return valueToConsumers; + } + + /** + * Returns the mapping for each {@link AppliedPTransform} in the {@link Pipeline} to a unique step + * name. + */ + public Map<AppliedPTransform<?, ?, ?>, String> getStepNames() { + checkState( + finalized, "Can't call getStepNames before the Pipeline has been completely traversed"); + + return stepNames; + } + + /** + * Returns the root transforms of the {@link Pipeline}. A root {@link AppliedPTransform} consumes + * a {@link PInput} where the {@link PInput#expand()} returns an empty collection. + */ + public Collection<AppliedPTransform<?, ?, ?>> getRootTransforms() { + checkState( + finalized, + "Can't call getRootTransforms before the Pipeline has been completely traversed"); + + return rootTransforms; + } + + /** + * Returns all of the {@link PCollectionView PCollectionViews} contained in the visited + * {@link Pipeline}. + */ + public Collection<PCollectionView<?>> getViews() { + checkState(finalized, "Can't call getViews before the Pipeline has been completely traversed"); + + return views; + } + + /** + * Returns all of the {@link PValue PValues} that have been produced but not consumed. These + * {@link PValue PValues} should be finalized by the {@link PipelineRunner} before the + * {@link Pipeline} is executed. + */ + public Set<PValue> getUnfinalizedPValues() { + checkState( + finalized, + "Can't call getUnfinalizedPValues before the Pipeline has been completely traversed"); + + return toFinalize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java new file mode 100644 index 0000000..5379038 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyTransformEvaluator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A {@link TransformEvaluator} that ignores all input and produces no output. The result of + * invoking {@link #finishBundle()} on this evaluator is to return an + * {@link InProcessTransformResult} with no elements and a timestamp hold equal to + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this hold + * will not affect the watermark. + */ +final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> { + public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?> transform) { + return new EmptyTransformEvaluator<T>(transform); + } + + private final AppliedPTransform<?, ?, ?> transform; + + private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) { + this.transform = transform; + } + + @Override + public void processElement(WindowedValue<T> element) throws Exception {} + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java new file mode 100644 index 0000000..ccf4c2b --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +/** + * Enforces that all elements in a {@link PCollection} can be encoded using that + * {@link PCollection PCollection's} {@link Coder}. + */ +class EncodabilityEnforcementFactory implements ModelEnforcementFactory { + public static EncodabilityEnforcementFactory create() { + return new EncodabilityEnforcementFactory(); + } + + @Override + public <T> ModelEnforcement<T> forBundle( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { + return new EncodabilityEnforcement<>(input); + } + + private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> { + private Coder<T> coder; + + public EncodabilityEnforcement(CommittedBundle<T> input) { + coder = input.getPCollection().getCoder(); + } + + @Override + public void beforeElement(WindowedValue<T> element) { + try { + T clone = CoderUtils.clone(coder, element.getValue()); + if (coder.consistentWithEquals()) { + checkArgument( + coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)), + "Coder %s of class %s does not maintain structural value equality" + + " on input element %s", + coder, + coder.getClass().getSimpleName(), + element.getValue()); + } + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java new file mode 100644 index 0000000..1c36751 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import java.util.Objects; + +/** + * A (Transform, Pipeline Execution) key for stateful evaluators. + * + * Source evaluators are stateful to ensure data is not read multiple times. Evaluators are cached + * to ensure that the reader is not restarted if the evaluator is retriggered. An + * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be executed without sharing + * the same evaluators. + */ +final class EvaluatorKey { + private final AppliedPTransform<?, ?, ?> transform; + private final InProcessEvaluationContext context; + + public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, InProcessEvaluationContext context) { + this.transform = transform; + this.context = context; + } + + @Override + public int hashCode() { + return Objects.hash(transform, context); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof EvaluatorKey)) { + return false; + } + EvaluatorKey that = (EvaluatorKey) other; + return Objects.equals(this.transform, that.transform) + && Objects.equals(this.context, that.context); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java new file mode 100644 index 0000000..91dc258 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import java.util.concurrent.ExecutorService; + +/** + * A factory that creates {@link ExecutorService ExecutorServices}. + * {@link ExecutorService ExecutorServices} created by this factory should be independent of one + * another (e.g., if any executor is shut down the remaining executors should continue to process + * work). + */ +public interface ExecutorServiceFactory { + /** + * Create a new {@link ExecutorService}. + */ + ExecutorService create(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java new file mode 100644 index 0000000..18af363 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Nullable; + +/** + * An {@link InProcessExecutor} that uses an underlying {@link ExecutorService} and + * {@link InProcessEvaluationContext} to execute a {@link Pipeline}. + */ +final class ExecutorServiceParallelExecutor implements InProcessExecutor { + private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class); + + private final ExecutorService executorService; + + private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; + private final Set<PValue> keyedPValues; + private final TransformEvaluatorRegistry registry; + @SuppressWarnings("rawtypes") + private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> + transformEnforcements; + + private final InProcessEvaluationContext evaluationContext; + + private final LoadingCache<StepAndKey, TransformExecutorService> executorServices; + private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors; + + private final Queue<ExecutorUpdate> allUpdates; + private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates; + + private final TransformExecutorService parallelExecutorService; + private final CompletionCallback defaultCompletionCallback; + + private Collection<AppliedPTransform<?, ?, ?>> rootNodes; + + public static ExecutorServiceParallelExecutor create( + ExecutorService executorService, + Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, + Set<PValue> keyedPValues, + TransformEvaluatorRegistry registry, + @SuppressWarnings("rawtypes") + Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, + InProcessEvaluationContext context) { + return new ExecutorServiceParallelExecutor( + executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context); + } + + private ExecutorServiceParallelExecutor( + ExecutorService executorService, + Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, + Set<PValue> keyedPValues, + TransformEvaluatorRegistry registry, + @SuppressWarnings("rawtypes") + Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, + InProcessEvaluationContext context) { + this.executorService = executorService; + this.valueToConsumers = valueToConsumers; + this.keyedPValues = keyedPValues; + this.registry = registry; + this.transformEnforcements = transformEnforcements; + this.evaluationContext = context; + + scheduledExecutors = new ConcurrentHashMap<>(); + // Weak Values allows TransformExecutorServices that are no longer in use to be reclaimed. + // Executing TransformExecutorServices have a strong reference to their TransformExecutorService + // which stops the TransformExecutorServices from being prematurely garbage collected + executorServices = + CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader()); + + this.allUpdates = new ConcurrentLinkedQueue<>(); + this.visibleUpdates = new ArrayBlockingQueue<>(20); + + parallelExecutorService = + TransformExecutorServices.parallel(executorService, scheduledExecutors); + defaultCompletionCallback = new DefaultCompletionCallback(); + } + + private CacheLoader<StepAndKey, TransformExecutorService> + serialTransformExecutorServiceCacheLoader() { + return new CacheLoader<StepAndKey, TransformExecutorService>() { + @Override + public TransformExecutorService load(StepAndKey stepAndKey) throws Exception { + return TransformExecutorServices.serial(executorService, scheduledExecutors); + } + }; + } + + @Override + public void start(Collection<AppliedPTransform<?, ?, ?>> roots) { + rootNodes = ImmutableList.copyOf(roots); + Runnable monitorRunnable = new MonitorRunnable(); + executorService.submit(monitorRunnable); + } + + @SuppressWarnings("unchecked") + public void scheduleConsumption( + AppliedPTransform<?, ?, ?> consumer, + @Nullable CommittedBundle<?> bundle, + CompletionCallback onComplete) { + evaluateBundle(consumer, bundle, onComplete); + } + + private <T> void evaluateBundle( + final AppliedPTransform<?, ?, ?> transform, + @Nullable final CommittedBundle<T> bundle, + final CompletionCallback onComplete) { + TransformExecutorService transformExecutor; + + if (bundle != null && isKeyed(bundle.getPCollection())) { + final StepAndKey stepAndKey = + StepAndKey.of(transform, bundle == null ? null : bundle.getKey()); + // This executor will remain reachable until it has executed all scheduled transforms. + // The TransformExecutors keep a strong reference to the Executor, the ExecutorService keeps + // a reference to the scheduled TransformExecutor callable. Follow-up TransformExecutors + // (scheduled due to the completion of another TransformExecutor) are provided to the + // ExecutorService before the Earlier TransformExecutor callable completes. + transformExecutor = executorServices.getUnchecked(stepAndKey); + } else { + transformExecutor = parallelExecutorService; + } + + Collection<ModelEnforcementFactory> enforcements = + MoreObjects.firstNonNull( + transformEnforcements.get(transform.getTransform().getClass()), + Collections.<ModelEnforcementFactory>emptyList()); + + TransformExecutor<T> callable = + TransformExecutor.create( + registry, + enforcements, + evaluationContext, + bundle, + transform, + onComplete, + transformExecutor); + transformExecutor.schedule(callable); + } + + private boolean isKeyed(PValue pvalue) { + return keyedPValues.contains(pvalue); + } + + private void scheduleConsumers(CommittedBundle<?> bundle) { + for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) { + scheduleConsumption(consumer, bundle, defaultCompletionCallback); + } + } + + @Override + public void awaitCompletion() throws Throwable { + VisibleExecutorUpdate update; + do { + update = visibleUpdates.take(); + if (update.throwable.isPresent()) { + throw update.throwable.get(); + } + } while (!update.isDone()); + executorService.shutdown(); + } + + /** + * The default {@link CompletionCallback}. The default completion callback is used to complete + * transform evaluations that are triggered due to the arrival of elements from an upstream + * transform, or for a source transform. + */ + private class DefaultCompletionCallback implements CompletionCallback { + @Override + public CommittedResult handleResult( + CommittedBundle<?> inputBundle, InProcessTransformResult result) { + CommittedResult committedResult = + evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result); + for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + } + return committedResult; + } + + @Override + public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + } + } + + /** + * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection + * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the + * timers used to create the input to the {@link InProcessEvaluationContext evaluation context} + * as part of the result. + */ + private class TimerCompletionCallback implements CompletionCallback { + private final Iterable<TimerData> timers; + + private TimerCompletionCallback(Iterable<TimerData> timers) { + this.timers = timers; + } + + @Override + public CommittedResult handleResult( + CommittedBundle<?> inputBundle, InProcessTransformResult result) { + CommittedResult committedResult = + evaluationContext.handleResult(inputBundle, timers, result); + for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { + allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); + } + return committedResult; + } + + @Override + public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { + allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + } + } + + /** + * An internal status update on the state of the executor. + * + * Used to signal when the executor should be shut down (due to an exception). + */ + private static class ExecutorUpdate { + private final Optional<? extends CommittedBundle<?>> bundle; + private final Optional<? extends Throwable> throwable; + + public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) { + return new ExecutorUpdate(bundle, null); + } + + public static ExecutorUpdate fromThrowable(Throwable t) { + return new ExecutorUpdate(null, t); + } + + private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) { + this.bundle = Optional.fromNullable(producedBundle); + this.throwable = Optional.fromNullable(throwable); + } + + public Optional<? extends CommittedBundle<?>> getBundle() { + return bundle; + } + + public Optional<? extends Throwable> getException() { + return throwable; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(ExecutorUpdate.class) + .add("bundle", bundle) + .add("exception", throwable) + .toString(); + } + } + + /** + * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to + * return normally or throw an exception. + */ + private static class VisibleExecutorUpdate { + private final Optional<? extends Throwable> throwable; + private final boolean done; + + public static VisibleExecutorUpdate fromThrowable(Throwable e) { + return new VisibleExecutorUpdate(false, e); + } + + public static VisibleExecutorUpdate finished() { + return new VisibleExecutorUpdate(true, null); + } + + private VisibleExecutorUpdate(boolean done, @Nullable Throwable exception) { + this.throwable = Optional.fromNullable(exception); + this.done = done; + } + + public boolean isDone() { + return done; + } + } + + private class MonitorRunnable implements Runnable { + private final String runnableName = + String.format( + "%s$%s-monitor", + evaluationContext.getPipelineOptions().getAppName(), + ExecutorServiceParallelExecutor.class.getSimpleName()); + + @Override + public void run() { + String oldName = Thread.currentThread().getName(); + Thread.currentThread().setName(runnableName); + try { + ExecutorUpdate update = allUpdates.poll(); + // pull all of the pending work off of the queue + while (update != null) { + LOG.debug("Executor Update: {}", update); + if (update.getBundle().isPresent()) { + scheduleConsumers(update.getBundle().get()); + } else if (update.getException().isPresent()) { + visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); + } + update = allUpdates.poll(); + } + boolean timersFired = fireTimers(); + addWorkIfNecessary(timersFired); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Monitor died due to being interrupted"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) { + visibleUpdates.poll(); + } + } catch (Throwable t) { + LOG.error("Monitor thread died due to throwable", t); + while (!visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(t))) { + visibleUpdates.poll(); + } + } finally { + if (!shouldShutdown()) { + // The monitor thread should always be scheduled; but we only need to be scheduled once + executorService.submit(this); + } + Thread.currentThread().setName(oldName); + } + } + + /** + * Fires any available timers. Returns true if at least one timer was fired. + */ + private boolean fireTimers() throws Exception { + try { + boolean firedTimers = false; + for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers : + evaluationContext.extractFiredTimers().entrySet()) { + AppliedPTransform<?, ?, ?> transform = transformTimers.getKey(); + for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) { + for (TimeDomain domain : TimeDomain.values()) { + Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain); + if (delivery.isEmpty()) { + continue; + } + KeyedWorkItem<Object, Object> work = + KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle<?> bundle = + evaluationContext + .createKeyedBundle( + null, keyTimers.getKey(), (PCollection) transform.getInput()) + .add(WindowedValue.valueInEmptyWindows(work)) + .commit(Instant.now()); + scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); + firedTimers = true; + } + } + } + return firedTimers; + } catch (Exception e) { + LOG.error("Internal Error while delivering timers", e); + throw e; + } + } + + private boolean shouldShutdown() { + if (evaluationContext.isDone()) { + LOG.debug("Pipeline is finished. Shutting down. {}"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { + visibleUpdates.poll(); + } + executorService.shutdown(); + return true; + } + return false; + } + + /** + * If all active {@link TransformExecutor TransformExecutors} are in a blocked state, + * add more work from root nodes that may have additional work. This ensures that if a pipeline + * has elements available from the root nodes it will add those elements when necessary. + */ + private void addWorkIfNecessary(boolean firedTimers) { + // If any timers have fired, they will add more work; We don't need to add more + if (firedTimers) { + return; + } + for (TransformExecutor<?> executor : scheduledExecutors.keySet()) { + if (!isExecutorBlocked(executor)) { + // We have at least one executor that can proceed without adding additional work + return; + } + } + // All current TransformExecutors are blocked; add more work from the roots. + for (AppliedPTransform<?, ?, ?> root : rootNodes) { + if (!evaluationContext.isDone(root)) { + scheduleConsumption(root, null, defaultCompletionCallback); + } + } + } + + /** + * Return true if the provided executor might make more progress if no action is taken. + * + * <p>May return false even if all executor threads are currently blocked or cleaning up, as + * these can cause more work to be scheduled. If this does not occur, after these calls + * terminate, future calls will return true if all executors are waiting. + */ + private boolean isExecutorBlocked(TransformExecutor<?> executor) { + Thread thread = executor.getThread(); + if (thread == null) { + return false; + } + switch (thread.getState()) { + case TERMINATED: + throw new IllegalStateException(String.format( + "Unexpectedly encountered a Terminated TransformExecutor %s", executor)); + case WAITING: + case TIMED_WAITING: + // The thread is waiting for some external input. Adding more work may cause the thread + // to stop waiting (e.g. the thread is waiting on an unbounded side input) + return true; + case BLOCKED: + // The executor is blocked on acquisition of a java monitor. This usually means it is + // making a call to the EvaluationContext, but not a model-blocking call - and will + // eventually complete, at which point we may reevaluate. + default: + // NEW and RUNNABLE threads can make progress + return false; + } + } + } +}