Repository: helix
Updated Branches:
  refs/heads/master 8e58aa5ad -> a0ab2b2e3


[HELIX-550] Shutdown GenericHelixController on disconnect


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bfb4a3d3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bfb4a3d3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bfb4a3d3

Branch: refs/heads/master
Commit: bfb4a3d34228f5c3806b1eee9e98f401386e66a9
Parents: 122ebf5
Author: Kanak Biscuitwala <[email protected]>
Authored: Tue Nov 18 21:23:29 2014 -0800
Committer: Kanak Biscuitwala <[email protected]>
Committed: Tue Nov 18 21:23:29 2014 -0800

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 12 ++-
 .../helix/manager/zk/ZkHelixController.java     | 77 ++++++++++++--------
 2 files changed, 55 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index aef636e..113cace 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -92,9 +92,9 @@ import com.google.common.collect.Lists;
  */
 public class GenericHelixController implements IdealStateChangeListener,
     LiveInstanceChangeListener, MessageListener, CurrentStateChangeListener,
-    ControllerChangeListener, InstanceConfigChangeListener,
-    ScopedConfigChangeListener {
+    ControllerChangeListener, InstanceConfigChangeListener, 
ScopedConfigChangeListener {
   private static final Logger logger = 
Logger.getLogger(GenericHelixController.class.getName());
+  private static final long EVENT_THREAD_JOIN_TIMEOUT = 1000L;
   volatile boolean init = false;
   private final PipelineRegistry _registry;
 
@@ -621,6 +621,14 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
   }
 
+  public void shutdown() throws InterruptedException {
+    stopRebalancingTimer();
+    while (_eventThread.isAlive()) {
+      _eventThread.interrupt();
+      _eventThread.join(EVENT_THREAD_JOIN_TIMEOUT);
+    }
+  }
+
   private class ClusterEventProcessor extends Thread {
     @Override
     public void run() {

http://git-wip-us.apache.org/repos/asf/helix/blob/bfb4a3d3/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
index 295b69c..fafe604 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkHelixController.java
@@ -47,24 +47,25 @@ import org.apache.log4j.Logger;
 public class ZkHelixController implements HelixController {
   private static Logger LOG = Logger.getLogger(ZkHelixController.class);
 
-  final ZkHelixConnection _connection;
-  final ClusterId _clusterId;
-  final ControllerId _controllerId;
-  final GenericHelixController _pipeline;
-  final DefaultMessagingService _messagingService;
-  final List<HelixTimerTask> _timerTasks;
-  final ClusterAccessor _clusterAccessor;
-  final HelixDataAccessor _accessor;
-  final HelixManager _manager;
-  final ZkHelixLeaderElection _leaderElection;
-  boolean _isStarted;
+  private final ZkHelixConnection _connection;
+  private final ClusterId _clusterId;
+  private final ControllerId _controllerId;
+  private final DefaultMessagingService _messagingService;
+  private final List<HelixTimerTask> _timerTasks;
+  @SuppressWarnings("unused")
+  private final ClusterAccessor _clusterAccessor;
+  private final HelixDataAccessor _accessor;
+  private final HelixManager _manager;
+  private boolean _isStarted;
+
+  private GenericHelixController _pipeline;
+  private ZkHelixLeaderElection _leaderElection;
 
   public ZkHelixController(ZkHelixConnection connection, ClusterId clusterId,
       ControllerId controllerId) {
     _connection = connection;
     _clusterId = clusterId;
     _controllerId = controllerId;
-    _pipeline = new GenericHelixController();
     _clusterAccessor = connection.createClusterAccessor(clusterId);
     _accessor = connection.createDataAccessor(clusterId);
 
@@ -72,7 +73,6 @@ public class ZkHelixController implements HelixController {
     _timerTasks = new ArrayList<HelixTimerTask>();
 
     _manager = new ZKHelixManager(this);
-    _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
 
     _timerTasks.add(new StatusDumpTask(clusterId, 
_manager.getHelixDataAccessor()));
   }
@@ -112,34 +112,47 @@ public class ZkHelixController implements HelixController 
{
   }
 
   void reset() {
-    /**
-     * reset all handlers, make sure cleanup completed for previous session
-     * disconnect if fail to cleanup
-     */
-    _connection.resetHandlers(this);
+    // clean up old pipeline instance
+    if (_leaderElection != null) {
+      _connection.removeListener(this, _leaderElection, 
_accessor.keyBuilder().controller());
+    }
+    if (_pipeline != null) {
+      try {
+        _pipeline.shutdown();
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted shutting down GenericHelixController", e);
+      } finally {
+        _pipeline = null;
+        _leaderElection = null;
+      }
+    }
 
+    // reset all handlers, make sure cleanup completed for previous session
+    // disconnect if fail to cleanup
+    _connection.resetHandlers(this);
   }
 
   void init() {
-    /**
-     * from here on, we are dealing with new session
-     * init handlers
-     */
+    // from here on, we are dealing with new session
+
+    // init handlers
     if (!ZKUtil.isClusterSetup(_clusterId.toString(), _connection._zkclient)) {
       throw new HelixException("Cluster structure is not set up for cluster: " 
+ _clusterId);
     }
 
-    /**
-     * leader-election listener should be reset/init before all other 
controller listeners;
-     * it's ok to add a listener multiple times, since we check existence in
-     * ZkHelixConnection#addXXXListner()
-     */
-    _connection.addControllerListener(this, _leaderElection, _clusterId);
+    // Recreate the pipeline on a new connection
+    if (_pipeline == null) {
+      _pipeline = new GenericHelixController();
+      _leaderElection = new ZkHelixLeaderElection(this, _pipeline);
 
-    /**
-     * ok to init message handler and controller handlers twice
-     * the second init will be skipped (see CallbackHandler)
-     */
+      // leader-election listener should be reset/init before all other 
controller listeners;
+      // it's ok to add a listener multiple times, since we check existence in
+      // ZkHelixConnection#addXXXListner()
+      _connection.addControllerListener(this, _leaderElection, _clusterId);
+    }
+
+    // ok to init message handler and controller handlers twice
+    // the second init will be skipped (see CallbackHandler)
     _connection.initHandlers(this);
   }
 

Reply via email to