Repository: incubator-beam Updated Branches: refs/heads/master 983d467a7 -> d6adbbf96
Shutdown the InProcessPipelineRunner after Terminating Abnormally Ensure that the executor service is shutdown, and the monitor is not rescheduled, after an exception is thrown. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59113474 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59113474 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59113474 Branch: refs/heads/master Commit: 591134749f2d24a7d6550e0bb00845a36cdb1616 Parents: 2173000 Author: Thomas Groh <[email protected]> Authored: Wed Jun 8 12:00:05 2016 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Jun 8 12:00:05 2016 -0700 ---------------------------------------------------------------------- .../direct/ExecutorServiceParallelExecutor.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59113474/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 3129145..980d764 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -351,6 +351,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { evaluationContext.getPipelineOptions().getAppName(), ExecutorServiceParallelExecutor.class.getSimpleName()); + private boolean exceptionThrown = false; + @Override public void run() { String oldName = Thread.currentThread().getName(); @@ -366,6 +368,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { scheduleConsumers(update); } else if (update.getException().isPresent()) { visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get())); + exceptionThrown = true; } if (System.nanoTime() - updatesStart > maxTimeProcessingUpdatesNanos) { break; @@ -434,15 +437,17 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } private boolean shouldShutdown() { - if (evaluationContext.isDone()) { - LOG.debug("Pipeline is finished. Shutting down. {}"); - while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { - visibleUpdates.poll(); + boolean shouldShutdown = exceptionThrown || evaluationContext.isDone(); + if (shouldShutdown) { + if (evaluationContext.isDone()) { + LOG.debug("Pipeline is finished. Shutting down. {}"); + while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { + visibleUpdates.poll(); + } } executorService.shutdown(); - return true; } - return false; + return shouldShutdown; } /**
