Repository: helix Updated Branches: refs/heads/helix-0.6.x de238d68e -> 8d464cf8f
[HELIX-537] Shutdown executors Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8d464cf8 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8d464cf8 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8d464cf8 Branch: refs/heads/helix-0.6.x Commit: 8d464cf8f7b021d102e2216837ec94f27067dc79 Parents: de238d6 Author: Antony T Curtis <[email protected]> Authored: Tue Nov 11 11:17:43 2014 -0800 Committer: Antony T Curtis <[email protected]> Committed: Tue Nov 11 20:47:38 2014 -0800 ---------------------------------------------------------------------- .../messaging/handling/HelixTaskExecutor.java | 2 ++ .../org/apache/helix/task/TaskStateModel.java | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8d464cf8/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index fec242f..59605e4 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -153,6 +153,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { if (prevExecutor != null) { LOG.warn("Skip creating a new thread pool for type: " + type + ", already existing pool: " + prevExecutor + ", isShutdown: " + prevExecutor.isShutdown()); + newPool.shutdown(); newPool = null; } LOG.info("Registered message handler factory for type: " + type + ", poolSize: " @@ -483,6 +484,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // Will happen if we register and call init LOG.info("Skip init a new thread pool for type: " + msgType + ", already existing pool: " + prevPool + ", isShutdown: " + prevPool.isShutdown()); + newPool.shutdown(); newPool = null; } } http://git-wip-us.apache.org/repos/asf/helix/blob/8d464cf8/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index 441bf79..9ca9ee9 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -25,6 +25,7 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; @@ -54,6 +55,25 @@ public class TaskStateModel extends StateModel { }); } + public boolean isShutdown() { + return _taskExecutor.isShutdown(); + } + + public boolean isTerminated() { + return _taskExecutor.isTerminated(); + } + + public void shutdown() { + reset(); + _taskExecutor.shutdown(); + _timer.cancel(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return _taskExecutor.awaitTermination(timeout, unit); + } + @Transition(to = "RUNNING", from = "INIT") public void onBecomeRunningFromInit(Message msg, NotificationContext context) { startTask(msg, msg.getPartitionName());
