Use a DirectExecutor for Watermark Callbacks

This fixes a resource leak where the executor service is not properly
shut down with the rest of the DirectRunner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee1297e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee1297e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee1297e2

Branch: refs/heads/master
Commit: ee1297e21a481fbea52475c0732526a0441d03cb
Parents: d53e96a
Author: Thomas Groh <[email protected]>
Authored: Tue Jun 7 17:50:38 2016 -0700
Committer: Thomas Groh <[email protected]>
Committed: Thu Jun 9 15:36:48 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/InProcessEvaluationContext.java   |  5 +++--
 .../beam/runners/direct/WatermarkCallbackExecutor.java    | 10 +++++-----
 2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 981a842..db8baa0 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -46,13 +46,13 @@ import org.apache.beam.sdk.values.PValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 
 import javax.annotation.Nullable;
 
@@ -130,7 +130,8 @@ class InProcessEvaluationContext {
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.mergedCounters = new CounterSet();
 
-    this.callbackExecutor = 
WatermarkCallbackExecutor.create(Executors.newSingleThreadExecutor());
+    this.callbackExecutor =
+        WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee1297e2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 1c9b050..0f73b1d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 
 /**
  * Executes callbacks that occur based on the progression of the watermark 
per-step.
@@ -51,15 +51,15 @@ class WatermarkCallbackExecutor {
   /**
    * Create a new {@link WatermarkCallbackExecutor}.
    */
-  public static WatermarkCallbackExecutor create(ExecutorService executor) {
+  public static WatermarkCallbackExecutor create(Executor executor) {
     return new WatermarkCallbackExecutor(executor);
   }
 
   private final ConcurrentMap<AppliedPTransform<?, ?, ?>, 
PriorityQueue<WatermarkCallback>>
       callbacks;
-  private final ExecutorService executor;
+  private final Executor executor;
 
-  private WatermarkCallbackExecutor(ExecutorService executor) {
+  private WatermarkCallbackExecutor(Executor executor) {
     this.callbacks = new ConcurrentHashMap<>();
     this.executor = executor;
   }
@@ -101,7 +101,7 @@ class WatermarkCallbackExecutor {
     }
     synchronized (callbackQueue) {
       while (!callbackQueue.isEmpty() && 
callbackQueue.peek().shouldFire(watermark)) {
-        executor.submit(callbackQueue.poll().getCallback());
+        executor.execute(callbackQueue.poll().getCallback());
       }
     }
   }

Reply via email to