Repository: helix Updated Branches: refs/heads/master 8e58aa5ad -> a0ab2b2e3
[HELIX-550] Shutdown GenericHelixController on disconnect Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bfb4a3d3 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bfb4a3d3 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bfb4a3d3 Branch: refs/heads/master Commit: bfb4a3d34228f5c3806b1eee9e98f401386e66a9 Parents: 122ebf5 Author: Kanak Biscuitwala <[email protected]> Authored: Tue Nov 18 21:23:29 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Tue Nov 18 21:23:29 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 12 ++- .../helix/manager/zk/ZkHelixController.java | 77 ++++++++++++-------- 2 files changed, 55 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index aef636e..113cace 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -92,9 +92,9 @@ import com.google.common.collect.Lists; */ public class GenericHelixController implements IdealStateChangeListener, LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener, - ControllerChangeListener, InstanceConfigChangeListener, - ScopedConfigChangeListener { + ControllerChangeListener, InstanceConfigChangeListener, ScopedConfigChangeListener { private static final Logger logger = Logger.getLogger(GenericHelixController.class.getName()); + private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000L; volatile boolean init = false; private final PipelineRegistry _registry; @@ -621,6 +621,14 @@ public class GenericHelixController implements IdealStateChangeListener, } } + public void shutdown() throws InterruptedException { + stopRebalancingTimer(); + while (_eventThread.isAlive()) { + _eventThread.interrupt(); + _eventThread.join(EVENT_THREAD_JOIN_TIMEOUT); + } + } + private class ClusterEventProcessor extends Thread { @Override public void run() { http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java index 295b69c..fafe604 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java @@ -47,24 +47,25 @@ import org.apache.log4j.Logger; public class ZkHelixController implements HelixController { private static Logger LOG = Logger.getLogger(ZkHelixController.class); - final ZkHelixConnection _connection; - final ClusterId _clusterId; - final ControllerId _controllerId; - final GenericHelixController _pipeline; - final DefaultMessagingService _messagingService; - final List<HelixTimerTask> _timerTasks; - final ClusterAccessor _clusterAccessor; - final HelixDataAccessor _accessor; - final HelixManager _manager; - final ZkHelixLeaderElection _leaderElection; - boolean _isStarted; + private final ZkHelixConnection _connection; + private final ClusterId _clusterId; + private final ControllerId _controllerId; + private final DefaultMessagingService _messagingService; + private final List<HelixTimerTask> _timerTasks; + @SuppressWarnings("unused") + private final ClusterAccessor _clusterAccessor; + private final HelixDataAccessor _accessor; + private final HelixManager _manager; + private boolean _isStarted; + + private GenericHelixController _pipeline; + private ZkHelixLeaderElection _leaderElection; public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId, ControllerId controllerId) { _connection = connection; _clusterId = clusterId; _controllerId = controllerId; - _pipeline = new GenericHelixController(); _clusterAccessor = connection.createClusterAccessor(clusterId); _accessor = connection.createDataAccessor(clusterId); @@ -72,7 +73,6 @@ public class ZkHelixController implements HelixController { _timerTasks = new ArrayList<HelixTimerTask>(); _manager = new ZKHelixManager(this); - _leaderElection = new ZkHelixLeaderElection(this, _pipeline); _timerTasks.add(new StatusDumpTask(clusterId, _manager.getHelixDataAccessor())); } @@ -112,34 +112,47 @@ public class ZkHelixController implements HelixController { } void reset() { - /** - * reset all handlers, make sure cleanup completed for previous session - * disconnect if fail to cleanup - */ - _connection.resetHandlers(this); + // clean up old pipeline instance + if (_leaderElection != null) { + _connection.removeListener(this, _leaderElection, _accessor.keyBuilder().controller()); + } + if (_pipeline != null) { + try { + _pipeline.shutdown(); + } catch (InterruptedException e) { + LOG.info("Interrupted shutting down GenericHelixController", e); + } finally { + _pipeline = null; + _leaderElection = null; + } + } + // reset all handlers, make sure cleanup completed for previous session + // disconnect if fail to cleanup + _connection.resetHandlers(this); } void init() { - /** - * from here on, we are dealing with new session - * init handlers - */ + // from here on, we are dealing with new session + + // init handlers if (!ZKUtil.isClusterSetup(_clusterId.toString(), _connection._zkclient)) { throw new HelixException("Cluster structure is not set up for cluster: " + _clusterId); } - /** - * leader-election listener should be reset/init before all other controller listeners; - * it's ok to add a listener multiple times, since we check existence in - * ZkHelixConnection#addXXXListner() - */ - _connection.addControllerListener(this, _leaderElection, _clusterId); + // Recreate the pipeline on a new connection + if (_pipeline == null) { + _pipeline = new GenericHelixController(); + _leaderElection = new ZkHelixLeaderElection(this, _pipeline); - /** - * ok to init message handler and controller handlers twice - * the second init will be skipped (see CallbackHandler) - */ + // leader-election listener should be reset/init before all other controller listeners; + // it's ok to add a listener multiple times, since we check existence in + // ZkHelixConnection#addXXXListner() + _connection.addControllerListener(this, _leaderElection, _clusterId); + } + + // ok to init message handler and controller handlers twice + // the second init will be skipped (see CallbackHandler) _connection.initHandlers(this); }
