Repository: incubator-beam
Updated Branches:
  refs/heads/master 983d467a7 -> d6adbbf96


Shutdown the InProcessPipelineRunner after Terminating Abnormally

Ensure that the executor service is shutdown, and the monitor is not
rescheduled, after an exception is thrown.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59113474
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59113474
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59113474

Branch: refs/heads/master
Commit: 591134749f2d24a7d6550e0bb00845a36cdb1616
Parents: 2173000
Author: Thomas Groh <[email protected]>
Authored: Wed Jun 8 12:00:05 2016 -0700
Committer: Thomas Groh <[email protected]>
Committed: Wed Jun 8 12:00:05 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java    | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59113474/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 3129145..980d764 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -351,6 +351,8 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
         evaluationContext.getPipelineOptions().getAppName(),
         ExecutorServiceParallelExecutor.class.getSimpleName());
 
+    private boolean exceptionThrown = false;
+
     @Override
     public void run() {
       String oldName = Thread.currentThread().getName();
@@ -366,6 +368,7 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
             scheduleConsumers(update);
           } else if (update.getException().isPresent()) {
             
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
+            exceptionThrown = true;
           }
           if (System.nanoTime() - updatesStart > 
maxTimeProcessingUpdatesNanos) {
             break;
@@ -434,15 +437,17 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
     }
 
     private boolean shouldShutdown() {
-      if (evaluationContext.isDone()) {
-        LOG.debug("Pipeline is finished. Shutting down. {}");
-        while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
-          visibleUpdates.poll();
+      boolean shouldShutdown = exceptionThrown || evaluationContext.isDone();
+      if (shouldShutdown) {
+        if (evaluationContext.isDone()) {
+          LOG.debug("Pipeline is finished. Shutting down. {}");
+          while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
+            visibleUpdates.poll();
+          }
         }
         executorService.shutdown();
-        return true;
       }
-      return false;
+      return shouldShutdown;
     }
 
     /**

Reply via email to