Use a DirectExecutor for Watermark Callbacks This fixes a resource leak where the executor service is not properly shut down with the rest of the DirectRunner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee1297e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee1297e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee1297e2 Branch: refs/heads/master Commit: ee1297e21a481fbea52475c0732526a0441d03cb Parents: d53e96a Author: Thomas Groh <[email protected]> Authored: Tue Jun 7 17:50:38 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu Jun 9 15:36:48 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/InProcessEvaluationContext.java | 5 +++-- .../beam/runners/direct/WatermarkCallbackExecutor.java | 10 +++++----- 2 files changed, 8 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java index 981a842..db8baa0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -46,13 +46,13 @@ import org.apache.beam.sdk.values.PValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import javax.annotation.Nullable; @@ -130,7 +130,8 @@ class InProcessEvaluationContext { this.applicationStateInternals = new ConcurrentHashMap<>(); this.mergedCounters = new CounterSet(); - this.callbackExecutor = WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor()); + this.callbackExecutor = + WatermarkCallbackExecutor.create(MoreExecutors.directExecutor()); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java index 1c9b050..0f73b1d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -29,7 +29,7 @@ import org.joda.time.Instant; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; /** * Executes callbacks that occur based on the progression of the watermark per-step. @@ -51,15 +51,15 @@ class WatermarkCallbackExecutor { /** * Create a new {@link WatermarkCallbackExecutor}. */ - public static WatermarkCallbackExecutor create(ExecutorService executor) { + public static WatermarkCallbackExecutor create(Executor executor) { return new WatermarkCallbackExecutor(executor); } private final ConcurrentMap<AppliedPTransform<?, ?, ?>, PriorityQueue<WatermarkCallback>> callbacks; - private final ExecutorService executor; + private final Executor executor; - private WatermarkCallbackExecutor(ExecutorService executor) { + private WatermarkCallbackExecutor(Executor executor) { this.callbacks = new ConcurrentHashMap<>(); this.executor = executor; } @@ -101,7 +101,7 @@ class WatermarkCallbackExecutor { } synchronized (callbackQueue) { while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { - executor.submit(callbackQueue.poll().getCallback()); + executor.execute(callbackQueue.poll().getCallback()); } } }
