Repository: helix Updated Branches: refs/heads/helix-0.6.x 2eaea9135 -> 45a9fd3a9
[HELIX-550] ZKHelixManager should shutdown GenericHelixController. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/af882ea0 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/af882ea0 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/af882ea0 Branch: refs/heads/helix-0.6.x Commit: af882ea025b1daf821f9f17f969a587ca7ec3e17 Parents: 961a9e1 Author: Antony T Curtis <[email protected]> Authored: Mon Nov 17 14:04:44 2014 -0800 Committer: Antony T Curtis <[email protected]> Committed: Mon Nov 17 16:15:53 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 9 ++++++ .../apache/helix/manager/zk/ZKHelixManager.java | 34 +++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/af882ea0/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 81c2e3d..5e5ad5b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -573,6 +573,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC } } + public void shutdown() throws InterruptedException { + stopRebalancingTimer(); + while (_eventThread.isAlive()) + { + _eventThread.interrupt(); + _eventThread.join(1000); + } + } + private class ClusterEventProcessor extends Thread { @Override public void run() { http://git-wip-us.apache.org/repos/asf/helix/blob/af882ea0/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index a6895cf..b61c100 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -125,7 +125,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { /** * controller fields */ - private final GenericHelixController _controller; + private GenericHelixController _controller; private CallbackHandler _leaderElectionHandler = null; protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>(); @@ -216,21 +216,18 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { */ switch (instanceType) { case PARTICIPANT: - _controller = null; _stateMachineEngine = new HelixStateMachineEngine(this); _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector)); break; case CONTROLLER: - _controller = new GenericHelixController(); _stateMachineEngine = null; _participantHealthInfoCollector = null; _controllerTimerTasks.add(new StatusDumpTask(this)); break; case CONTROLLER_PARTICIPANT: - _controller = new GenericHelixController(); _stateMachineEngine = new HelixStateMachineEngine(this); _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName); @@ -240,7 +237,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { break; case ADMINISTRATOR: case SPECTATOR: - _controller = null; _stateMachineEngine = null; _participantHealthInfoCollector = null; break; @@ -501,6 +497,21 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { return; } + switch (_instanceType) { + case CONTROLLER: + if (_controller == null) { + _controller = new GenericHelixController(); + } + break; + case CONTROLLER_PARTICIPANT: + if (_controller == null) { + _controller = new GenericHelixController(); + } + break; + default: + break; + } + try { createClient(); _messagingService.onConnected(); @@ -543,6 +554,19 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { _zkclient.close(); _zkclient = null; LOG.info("Cluster manager: " + _instanceName + " disconnected"); + + if (_controller != null) { + try { + _controller.shutdown(); + } + catch (InterruptedException e) { + LOG.info("Interrupted shutting down GenericHelixController", e); + } + finally { + _controller = null; + _leaderElectionHandler = null; + } + } } }
