Updated Branches: refs/heads/master 3b59bc39c -> 0e72af69c
[HELIX-350] cluster status monitor should not be reset in FINALIZE type pipeline,rb=16772 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0e72af69 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0e72af69 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0e72af69 Branch: refs/heads/master Commit: 0e72af69cf03beba75383676e079b54799c51d55 Parents: 3b59bc3 Author: slu2011 <[email protected]> Authored: Thu Jan 9 15:03:48 2014 -0800 Committer: slu2011 <[email protected]> Committed: Thu Jan 9 15:03:48 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 14 +- .../stages/ExternalViewComputeStage.java | 24 +- .../DistClusterControllerElection.java | 8 +- .../TestClusterStatusMonitorLifecycle.java | 229 +++++++++++++++++++ 4 files changed, 253 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 9fef2da..b15627a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -250,11 +250,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC // Initialize _clusterStatusMonitor if (context != null) { if (context.getType() == Type.FINALIZE) { - if (_clusterStatusMonitor != null) { - _clusterStatusMonitor.reset(); - _clusterStatusMonitor = null; - } - stopRebalancingTimer(); logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName()); return; @@ -533,5 +528,12 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC _lastSeenSessions.set(curSessions); } - + + public void shutdownClusterStatusMonitor(String clusterName) { + if (_clusterStatusMonitor != null) { + logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName); + _clusterStatusMonitor.reset(); + _clusterStatusMonitor = null; + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index 977b661..dddb0c0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -35,6 +35,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.ZNRecordDelta; import org.apache.helix.ZNRecordDelta.MergeOperation; import org.apache.helix.api.Cluster; +import org.apache.helix.api.Resource; import org.apache.helix.api.State; import org.apache.helix.api.config.ResourceConfig; import org.apache.helix.api.config.SchedulerTaskConfig; @@ -45,11 +46,13 @@ import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.controller.rebalancer.config.RebalancerConfig; +import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.StatusUpdate; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.log4j.Logger; public class ExternalViewComputeStage extends AbstractBaseStage { @@ -113,19 +116,16 @@ public class ExternalViewComputeStage extends AbstractBaseStage { } } - // TODO fix this // Update cluster status monitor mbean - // ClusterStatusMonitor clusterStatusMonitor = - // (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"); - // IdealState idealState = cache._idealStateMap.get(view.getResourceName()); - // if (idealState != null) { - // if (clusterStatusMonitor != null - // && !idealState.getStateModelDefRef().equalsIgnoreCase( - // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { - // clusterStatusMonitor.onExternalViewChange(view, - // cache._idealStateMap.get(view.getResourceName())); - // } - // } + ClusterStatusMonitor clusterStatusMonitor = (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor"); + Resource currentResource = cluster.getResourceMap().get(view.getResourceId()); + if (currentResource != null) { + IdealState idealState = currentResource.getIdealState(); + if (clusterStatusMonitor != null && + !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { + clusterStatusMonitor.onExternalViewChange(view, idealState); + } + } // compare the new external view with current one, set only on different ExternalView curExtView = curExtViews.get(resourceId.stringify()); http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java index 0e8c6fd..45bee64 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerElection.java @@ -95,13 +95,13 @@ public class DistClusterControllerElection implements ControllerChangeListener { } } - } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) { - - if (_leader != null) { + } + else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) { + if(_leader != null) { _leader.disconnect(); } + _controller.shutdownClusterStatusMonitor(manager.getClusterName()); } - } catch (Exception e) { LOG.error("Exception when trying to become leader", e); } http://git-wip-us.apache.org/repos/asf/helix/blob/0e72af69/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java new file mode 100644 index 0000000..373b024 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java @@ -0,0 +1,229 @@ +package org.apache.helix.monitoring; + +import java.io.IOException; +import java.util.Date; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerNotification; +import javax.management.MalformedObjectNameException; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.TestDistributedCMMain; +import org.apache.helix.integration.manager.ClusterDistributedController; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestClusterStatusMonitorLifecycle extends TestDistributedCMMain{ + + MockParticipantManager[] _participants; + ClusterDistributedController[] _controllers; + String _controllerClusterName; + String _clusterNamePrefix; + String _firstClusterName; + + final int n = 5; + final int clusterNb = 10; + + @BeforeClass + public void testDistributedCMMain() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + _clusterNamePrefix = className + "_" + methodName; + + System.out + .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); + + // setup 10 clusters + for (int i = 0; i < clusterNb; i++) { + String clusterName = _clusterNamePrefix + "0_" + i; + String participantName = "localhost" + i; + String resourceName = "TestDB" + i; + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + participantName, // participant name prefix + resourceName, // resource name prefix + 1, // resources + 8, // partitions per resource + n, // number of nodes + 3, // replicas + "MasterSlave", true); // do rebalance + } + + // setup controller cluster + _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix; + TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // controller + // port + "controller", // participant name prefix + _clusterNamePrefix, // resource name prefix + 1, // resources + clusterNb, // partitions per resource + n, // number of nodes + 3, // replicas + "LeaderStandby", true); // do rebalance + + // start distributed cluster controllers + _controllers = new ClusterDistributedController[n + n]; + for (int i = 0; i < n; i++) { + _controllers[i] = + new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i); + _controllers[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName), + 30000); + Assert.assertTrue(result, "Controller cluster NOT in ideal state"); + + // start first cluster + _participants = new MockParticipantManager[n]; + _firstClusterName = _clusterNamePrefix + "0_0"; + for (int i = 0; i < n; i++) { + String instanceName = "localhost0_" + (12918 + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName); + _participants[i].syncStart(); + } + + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _firstClusterName)); + Assert.assertTrue(result, "first cluster NOT in ideal state"); + + // add more controllers to controller cluster + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + for (int i = 0; i < n; i++) { + String controller = "controller_" + (n + i); + setupTool.addInstanceToCluster(_controllerClusterName, controller); + } + setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6); + for (int i = n; i < 2 * n; i++) { + _controllers[i] = + new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i); + _controllers[i].syncStart(); + } + + // verify controller cluster + result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + _controllerClusterName)); + Assert.assertTrue(result, "Controller cluster NOT in ideal state"); + + // verify first cluster + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _firstClusterName)); + Assert.assertTrue(result, "first cluster NOT in ideal state"); + } + + @AfterClass + public void afterClass(){ + System.out.println("Cleaning up..."); + for (int i = 0; i < 5; i++) { + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + _controllerClusterName)); + _controllers[i].syncStop(); + } + + for (int i = 0; i < 5; i++) { + _participants[i].syncStop(); + } + + System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); + + } + + class ParticipantMonitorListener extends ClusterMBeanObserver { + + int _nMbeansUnregistered = 0; + int _nMbeansRegistered = 0; + public ParticipantMonitorListener(String domain) + throws InstanceNotFoundException, IOException, + MalformedObjectNameException, NullPointerException { + super(domain); + } + + @Override + public void onMBeanRegistered(MBeanServerConnection server, + MBeanServerNotification mbsNotification) { + _nMbeansRegistered ++; + } + + @Override + public void onMBeanUnRegistered(MBeanServerConnection server, + MBeanServerNotification mbsNotification) { + _nMbeansUnregistered++; + }} + + @Test + public void testClusterStatusMonitorLifecycle() throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, IOException, InterruptedException{ + ParticipantMonitorListener listener = new ParticipantMonitorListener("ClusterStatus"); + + int nMbeansUnregistered = listener._nMbeansUnregistered; + int nMbeansRegistered = listener._nMbeansRegistered; + + _participants[0].disconnect(); + + // participant goes away. should be no change + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered); + + HelixDataAccessor accessor = _participants[n-1].getHelixDataAccessor(); + String firstControllerName = accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId(); + + ClusterDistributedController firstController = null; + for(ClusterDistributedController controller : _controllers) + { + if(controller.getInstanceName().equals(firstControllerName)) + { + firstController = controller; + } + } + firstController.disconnect(); + Thread.sleep(1000); + + // 1 cluster status monitor and 1 resource monitor + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2); + + String instanceName = "localhost0_" + (12918 + 0); + _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName); + _participants[0].syncStart(); + + // participant goes back. should be no change + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2); + + // Add a resource, one more mbean registered + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00")); + + setupTool.addResourceToCluster(_firstClusterName, "TestDB1", idealState.getNumPartitions(), "MasterSlave"); + setupTool.rebalanceResource(_firstClusterName, "TestDB1", Integer.parseInt(idealState.getReplicas())); + + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3); + + // remove resource, no change + setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1"); + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3); + + + } +}
