Convert Executor Services to use Daemon Threads This will cause the DirectRunner to automatically shut down when the worker threads are shut down.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/13e51c9c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/13e51c9c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/13e51c9c Branch: refs/heads/master Commit: 13e51c9cd114eec73c47b71f46214dd92ee81048 Parents: e1791c3 Author: Dan Halperin <[email protected]> Authored: Fri May 5 19:49:29 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon May 8 12:52:09 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/BoundedReadEvaluatorFactory.java | 13 ++++++++++++- .../org/apache/beam/runners/direct/DirectMetrics.java | 10 +++++++++- .../apache/beam/runners/direct/EvaluationContext.java | 3 +-- .../direct/ExecutorServiceParallelExecutor.java | 12 +++++++++++- .../SplittableProcessElementsEvaluatorFactory.java | 11 ++++++++++- 5 files changed, 43 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 99a0fca..76db861 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -21,7 +21,9 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.util.Collection; import java.util.List; @@ -57,7 +59,16 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { */ private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0; private final EvaluationContext evaluationContext; - @VisibleForTesting final ExecutorService executor = Executors.newCachedThreadPool(); + + // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner. + @VisibleForTesting + final ExecutorService executor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setThreadFactory(MoreExecutors.platformThreadFactory()) + .setDaemon(true) + .setNameFormat("direct-dynamic-split-requester") + .build()); private final long minimumDynamicSplitSize; http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index b6ca492..b7cd6e7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -21,6 +21,8 @@ import static java.util.Arrays.asList; import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.Map; import java.util.Map.Entry; @@ -51,7 +53,13 @@ import org.apache.beam.sdk.metrics.MetricsMap; class DirectMetrics extends MetricResults { // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner. - private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool(); + private static final ExecutorService COUNTER_COMMITTER = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setThreadFactory(MoreExecutors.platformThreadFactory()) + .setDaemon(true) + .setNameFormat("direct-metrics-counter-committer") + .build()); private interface MetricAggregation<UpdateT, ResultT> { UpdateT zero(); http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 362ff91..c627119 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -123,8 +123,7 @@ class EvaluationContext { this.applicationStateInternals = new ConcurrentHashMap<>(); this.metrics = new DirectMetrics(); - this.callbackExecutor = - WatermarkCallbackExecutor.create(MoreExecutors.directExecutor()); + this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor()); } public void initialize( http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 02fb11a..b7f4732 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -28,6 +28,8 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -132,7 +134,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) { this.targetParallelism = targetParallelism; - this.executorService = Executors.newFixedThreadPool(targetParallelism); + // Don't use Daemon threads for workers. The Pipeline should continue to execute even if there + // are no other active threads (for example, because waitUntilFinish was not called) + this.executorService = + Executors.newFixedThreadPool( + targetParallelism, + new ThreadFactoryBuilder() + .setThreadFactory(MoreExecutors.platformThreadFactory()) + .setNameFormat("direct-runner-worker") + .build()); this.graph = graph; this.rootProviderRegistry = rootProviderRegistry; this.registry = registry; http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 2797233..f490b0b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Collection; import java.util.List; import java.util.concurrent.Executors; @@ -171,7 +173,14 @@ class SplittableProcessElementsEvaluatorFactory< outputWindowedValue, evaluationContext.createSideInputReader(transform.getSideInputs()), // TODO: For better performance, use a higher-level executor? - Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), + // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the + // DirectRunner. + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setThreadFactory(MoreExecutors.platformThreadFactory()) + .setDaemon(true) + .setNameFormat("direct-splittable-process-element-checkpoint-executor") + .build()), 10000, Duration.standardSeconds(10)));
