Repository: apex-core Updated Branches: refs/heads/master bfc1eb874 -> e924284ee
APEXCORE-535 - Node.teardown() should try to gracefully shutdown exectutor service Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/e924284e Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/e924284e Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/e924284e Branch: refs/heads/master Commit: e924284ee026790bf60a0254d4bcdcf822a665ba Parents: bfc1eb8 Author: Vlad Rozov <[email protected]> Authored: Wed Sep 14 22:06:36 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Wed Sep 14 22:06:36 2016 -0700 ---------------------------------------------------------------------- .../main/java/com/datatorrent/stram/engine/Node.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/e924284e/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index 0b56bce..4a5cbde 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -43,6 +43,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,7 +210,17 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera } if (executorService != null) { - executorService.shutdownNow(); + executorService.shutdown(); + boolean terminated = false; + try { + terminated = executorService.awaitTermination(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.debug("Wait for graceful executor service {} shutdown interrupted for node {}", executorService, this, e); + } + if (!terminated) { + logger.warn("Shutting down executor service {} for node {}", executorService, this); + executorService.shutdownNow(); + } } operator.teardown(); }
