Repository: helix Updated Branches: refs/heads/helix-0.6.x b72ff29d1 -> 456ddb0c4
[HELIX-613] Fix thread leaking problems in TaskStateModel by sharing one thread pool among all tasks and timeout tasks from TaskStateModels created from the same TaskStateModelFactory. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/456ddb0c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/456ddb0c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/456ddb0c Branch: refs/heads/helix-0.6.x Commit: 456ddb0c4a900ee7cdf081777eff9445378df513 Parents: b72ff29 Author: Lei Xia <[email protected]> Authored: Sat Nov 7 22:43:04 2015 -0800 Committer: Lei Xia <[email protected]> Committed: Sat Nov 7 22:43:04 2015 -0800 ---------------------------------------------------------------------- .../org/apache/helix/task/TaskStateModel.java | 55 +++++++++++--------- .../helix/task/TaskStateModelFactory.java | 33 ++++++++++-- 2 files changed, 60 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/456ddb0c/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index 9ca9ee9..30939fc 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -20,12 +20,8 @@ package org.apache.helix.task; */ import java.util.Map; -import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; @@ -39,20 +35,16 @@ import org.apache.log4j.Logger; public class TaskStateModel extends StateModel { private static final Logger LOG = Logger.getLogger(TaskStateModel.class); private final HelixManager _manager; - private final ExecutorService _taskExecutor; + private final ScheduledExecutorService _taskExecutor; private final Map<String, TaskFactory> _taskFactoryRegistry; - private final Timer _timer = new Timer("TaskStateModel time out daemon", true); + private ScheduledFuture timeout_task; private TaskRunner _taskRunner; - public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) { + public TaskStateModel(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry, + ScheduledExecutorService taskExecutor) { _manager = manager; _taskFactoryRegistry = taskFactoryRegistry; - _taskExecutor = Executors.newFixedThreadPool(40, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "TaskStateModel-thread-pool"); - } - }); + _taskExecutor = taskExecutor; } public boolean isShutdown() { @@ -65,8 +57,6 @@ public class TaskStateModel extends StateModel { public void shutdown() { reset(); - _taskExecutor.shutdown(); - _timer.cancel(); } public boolean awaitTermination(long timeout, TimeUnit unit) @@ -91,6 +81,8 @@ public class TaskStateModel extends StateModel { TaskResult r = _taskRunner.waitTillDone(); LOG.info(String.format("Task %s completed with result %s.", msg.getPartitionName(), r)); + timeout_task.cancel(false); + return r.getInfo(); } @@ -108,6 +100,8 @@ public class TaskStateModel extends StateModel { "Partition %s received a state transition to %s but the result status code is %s.", msg.getPartitionName(), msg.getToState(), r.getStatus())); } + + timeout_task.cancel(false); } @Transition(to = "TIMED_OUT", from = "RUNNING") @@ -125,6 +119,8 @@ public class TaskStateModel extends StateModel { msg.getPartitionName(), msg.getToState(), r.getStatus())); } + timeout_task.cancel(false); + return r.getInfo(); } @@ -143,6 +139,8 @@ public class TaskStateModel extends StateModel { msg.getPartitionName(), msg.getToState(), r.getStatus())); } + timeout_task.cancel(false); + return r.getInfo(); } @@ -153,7 +151,7 @@ public class TaskStateModel extends StateModel { @Transition(to = "DROPPED", from = "INIT") public void onBecomeDroppedFromInit(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "DROPPED", from = "RUNNING") @@ -168,34 +166,35 @@ public class TaskStateModel extends StateModel { TaskResult r = _taskRunner.waitTillDone(); LOG.info(String.format("Task partition %s returned result %s.", msg.getPartitionName(), r)); _taskRunner = null; + timeout_task.cancel(false); } @Transition(to = "DROPPED", from = "COMPLETED") public void onBecomeDroppedFromCompleted(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "DROPPED", from = "STOPPED") public void onBecomeDroppedFromStopped(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "DROPPED", from = "TIMED_OUT") public void onBecomeDroppedFromTimedOut(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "DROPPED", from = "TASK_ERROR") public void onBecomeDroppedFromTaskError(Message msg, NotificationContext context) { - _taskRunner = null; + reset(); } @Transition(to = "INIT", from = "RUNNING") public void onBecomeInitFromRunning(Message msg, NotificationContext context) { String taskPartition = msg.getPartitionName(); if (_taskRunner == null) { - throw new IllegalStateException(String.format( - "Invalid state transition. There is no running task for partition %s.", taskPartition)); + throw new IllegalStateException(String + .format("Invalid state transition. There is no running task for partition %s.", taskPartition)); } _taskRunner.cancel(); @@ -228,6 +227,11 @@ public class TaskStateModel extends StateModel { public void reset() { if (_taskRunner != null) { _taskRunner.cancel(); + _taskRunner = null; + } + if (timeout_task != null) { + timeout_task.cancel(false); + timeout_task = null; } } @@ -276,13 +280,14 @@ public class TaskStateModel extends StateModel { _taskRunner.waitTillStarted(); // Set up a timer to cancel the task when its time out expires. - _timer.schedule(new TimerTask() { + + timeout_task = _taskExecutor.schedule(new TimerTask() { @Override public void run() { if (_taskRunner != null) { _taskRunner.timeout(); } } - }, cfg.getTimeoutPerTask()); + }, cfg.getTimeoutPerTask(), TimeUnit.MILLISECONDS); } } http://git-wip-us.apache.org/repos/asf/helix/blob/456ddb0c/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java index b8e91f5..522c9e5 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java @@ -20,6 +20,9 @@ package org.apache.helix.task; */ import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import org.apache.helix.HelixManager; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -30,14 +33,38 @@ import org.apache.helix.participant.statemachine.StateModelFactory; public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> { private final HelixManager _manager; private final Map<String, TaskFactory> _taskFactoryRegistry; + private final ScheduledExecutorService _taskExecutor; + private final static int TASK_THREADPOOL_SIZE = 40; public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry) { + this(manager, taskFactoryRegistry, + Executors.newScheduledThreadPool(TASK_THREADPOOL_SIZE, new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + return new Thread(r, "TaskStateModel-thread-pool"); + } + })); + } + + public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory> taskFactoryRegistry, + ScheduledExecutorService taskExecutor) { _manager = manager; _taskFactoryRegistry = taskFactoryRegistry; + _taskExecutor = taskExecutor; + } + + @Override public TaskStateModel createNewStateModel(String resourceName, String partitionKey) { + return new TaskStateModel(_manager, _taskFactoryRegistry, _taskExecutor); + } + + public void shutdown() { + _taskExecutor.shutdown(); + } + + public boolean isShutdown() { + return _taskExecutor.isShutdown(); } - @Override - public TaskStateModel createNewStateModel(String resourceName, String partitionKey) { - return new TaskStateModel(_manager, _taskFactoryRegistry); + public boolean isTerminated() { + return _taskExecutor.isTerminated(); } }
