Updated Branches: refs/heads/helix-0.6.2-release 339c1fe7a -> 73bacfda0
[HELIX-350]cluster status monitor should not be reset in FINALIZE type pipeline,rb=16772 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/73bacfda Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/73bacfda Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/73bacfda Branch: refs/heads/helix-0.6.2-release Commit: 73bacfda0bc48a6020d13b0931919976ecd1ab80 Parents: 339c1fe Author: slu2011 <[email protected]> Authored: Fri Jan 10 16:29:16 2014 -0800 Committer: slu2011 <[email protected]> Committed: Fri Jan 10 16:29:16 2014 -0800 ---------------------------------------------------------------------- .../TestClusterStatusMonitorLifecycle.java | 230 +++++++++++++++++++ 1 file changed, 230 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/73bacfda/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java new file mode 100644 index 0000000..65d60fb --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java @@ -0,0 +1,230 @@ +package org.apache.helix.monitoring; + +import java.io.IOException; +import java.util.Date; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerNotification; +import javax.management.MalformedObjectNameException; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.TestDistributedCMMain; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterDistributedController; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase{ + + MockParticipantManager[] _participants; + ClusterDistributedController[] _controllers; + String _controllerClusterName; + String _clusterNamePrefix; + String _firstClusterName; + + final int n = 5; + final int clusterNb = 10; + + @BeforeClass + public void beforeClass() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + _clusterNamePrefix = className + "_" + methodName; + + System.out + .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); + + // setup 10 clusters + for (int i = 0; i < clusterNb; i++) { + String clusterName = _clusterNamePrefix + "0_" + i; + String participantName = "localhost" + i; + String resourceName = "TestDB" + i; + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + participantName, // participant name prefix + resourceName, // resource name prefix + 1, // resources + 8, // partitions per resource + n, // number of nodes + 3, // replicas + "MasterSlave", true); // do rebalance + } + + // setup controller cluster + _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix; + TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // controller + // port + "controller", // participant name prefix + _clusterNamePrefix, // resource name prefix + 1, // resources + clusterNb, // partitions per resource + n, // number of nodes + 3, // replicas + "LeaderStandby", true); // do rebalance + + // start distributed cluster controllers + _controllers = new ClusterDistributedController[n + n]; + for (int i = 0; i < n; i++) { + _controllers[i] = + new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i); + _controllers[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName), + 30000); + Assert.assertTrue(result, "Controller cluster NOT in ideal state"); + + // start first cluster + _participants = new MockParticipantManager[n]; + _firstClusterName = _clusterNamePrefix + "0_0"; + for (int i = 0; i < n; i++) { + String instanceName = "localhost0_" + (12918 + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName); + _participants[i].syncStart(); + } + + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _firstClusterName)); + Assert.assertTrue(result, "first cluster NOT in ideal state"); + + // add more controllers to controller cluster + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + for (int i = 0; i < n; i++) { + String controller = "controller_" + (n + i); + setupTool.addInstanceToCluster(_controllerClusterName, controller); + } + setupTool.rebalanceStorageCluster(_controllerClusterName, _clusterNamePrefix + "0", 6); + for (int i = n; i < 2 * n; i++) { + _controllers[i] = + new ClusterDistributedController(ZK_ADDR, _controllerClusterName, "controller_" + i); + _controllers[i].syncStart(); + } + + // verify controller cluster + result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + _controllerClusterName)); + Assert.assertTrue(result, "Controller cluster NOT in ideal state"); + + // verify first cluster + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _firstClusterName)); + Assert.assertTrue(result, "first cluster NOT in ideal state"); + } + + @AfterClass + public void afterClass(){ + System.out.println("Cleaning up..."); + for (int i = 0; i < 5; i++) { + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + _controllerClusterName)); + _controllers[i].syncStop(); + } + + for (int i = 0; i < 5; i++) { + _participants[i].syncStop(); + } + + System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis())); + + } + + class ParticipantMonitorListener extends ClusterMBeanObserver { + + int _nMbeansUnregistered = 0; + int _nMbeansRegistered = 0; + public ParticipantMonitorListener(String domain) + throws InstanceNotFoundException, IOException, + MalformedObjectNameException, NullPointerException { + super(domain); + } + + @Override + public void onMBeanRegistered(MBeanServerConnection server, + MBeanServerNotification mbsNotification) { + _nMbeansRegistered ++; + } + + @Override + public void onMBeanUnRegistered(MBeanServerConnection server, + MBeanServerNotification mbsNotification) { + _nMbeansUnregistered++; + }} + + @Test + public void testClusterStatusMonitorLifecycle() throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, IOException, InterruptedException{ + ParticipantMonitorListener listener = new ParticipantMonitorListener("ClusterStatus"); + + int nMbeansUnregistered = listener._nMbeansUnregistered; + int nMbeansRegistered = listener._nMbeansRegistered; + + _participants[0].disconnect(); + + // participant goes away. should be no change + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered); + + HelixDataAccessor accessor = _participants[n-1].getHelixDataAccessor(); + String firstControllerName = accessor.getProperty(accessor.keyBuilder().controllerLeader()).getId(); + + ClusterDistributedController firstController = null; + for(ClusterDistributedController controller : _controllers) + { + if(controller.getInstanceName().equals(firstControllerName)) + { + firstController = controller; + } + } + firstController.disconnect(); + Thread.sleep(1000); + + // 1 cluster status monitor and 1 resource monitor + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2); + + String instanceName = "localhost0_" + (12918 + 0); + _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName); + _participants[0].syncStart(); + + // participant goes back. should be no change + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 2); + + // Add a resource, one more mbean registered + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00")); + + setupTool.addResourceToCluster(_firstClusterName, "TestDB1", idealState.getNumPartitions(), "MasterSlave"); + setupTool.rebalanceResource(_firstClusterName, "TestDB1", Integer.parseInt(idealState.getReplicas())); + + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3); + + // remove resource, no change + setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1"); + Thread.sleep(1000); + Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 2); + Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 3); + + + } +} \ No newline at end of file
