Repository: helix Updated Branches: refs/heads/helix-0.6.2-release c7b54e26b -> 977660d0d
[HELIX-381] ClusterStateVerifier can now work with a subset of resources Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/977660d0 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/977660d0 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/977660d0 Branch: refs/heads/helix-0.6.2-release Commit: 977660d0d15b5cda95705751620c36a52ebb4d7a Parents: c7b54e2 Author: Kanak Biscuitwala <[email protected]> Authored: Tue Feb 18 13:41:16 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Feb 19 17:38:04 2014 -0800 ---------------------------------------------------------------------- .../helix/tools/ClusterStateVerifier.java | 51 ++++++-- .../helix/tools/TestClusterStateVerifier.java | 128 +++++++++++++++++++ 2 files changed, 170 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/977660d0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index 7ceee85..3a3a09e 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -19,13 +19,13 @@ package org.apache.helix.tools; * under the License. */ -import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -41,10 +41,10 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; +import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; -import org.apache.helix.PropertyKey.Builder; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; import org.apache.helix.controller.stages.AttributeName; @@ -60,19 +60,18 @@ import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelFactory; -import org.apache.helix.store.PropertyJsonComparator; -import org.apache.helix.store.PropertyJsonSerializer; import org.apache.helix.util.ZKClientPool; import org.apache.log4j.Logger; +import com.google.common.collect.Sets; + public class ClusterStateVerifier { public static String cluster = "cluster"; public static String zkServerAddress = "zkSvr"; public static String help = "help"; public static String timeout = "timeout"; public static String period = "period"; + public static String resources = "resources"; private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class); @@ -134,6 +133,7 @@ public class ClusterStateVerifier { private final String clusterName; private final Map<String, Map<String, String>> errStates; private final ZkClient zkClient; + private final Set<String> resources; public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) { this(zkAddr, clusterName, null); @@ -141,6 +141,11 @@ public class ClusterStateVerifier { public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName, Map<String, Map<String, String>> errStates) { + this(zkAddr, clusterName, errStates, null); + } + + public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName, + Map<String, Map<String, String>> errStates, Set<String> resources) { if (zkAddr == null || clusterName == null) { throw new IllegalArgumentException("requires zkAddr|clusterName"); } @@ -148,6 +153,7 @@ public class ClusterStateVerifier { this.clusterName = clusterName; this.errStates = errStates; this.zkClient = ZKClientPool.getZkClient(zkAddr); // null; + this.resources = resources; } @Override @@ -156,7 +162,8 @@ public class ClusterStateVerifier { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates); + return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName, + resources); } catch (Exception e) { LOG.error("exception in verification", e); } @@ -222,7 +229,12 @@ public class ClusterStateVerifier { } static boolean verifyBestPossAndExtView(HelixDataAccessor accessor, - Map<String, Map<String, String>> errStates) { + Map<String, Map<String, String>> errStates, String clusterName) { + return verifyBestPossAndExtView(accessor, errStates, clusterName, null); + } + + static boolean verifyBestPossAndExtView(HelixDataAccessor accessor, + Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) { try { Builder keyBuilder = accessor.keyBuilder(); // read cluster once and do verification @@ -242,6 +254,12 @@ public class ClusterStateVerifier { extViews = Collections.emptyMap(); } + // Filter resources if requested + if (resources != null && !resources.isEmpty()) { + idealStates.keySet().retainAll(resources); + extViews.keySet().retainAll(resources); + } + // if externalView is not empty and idealState doesn't exist // add empty idealState for the resource for (String resource : extViews.keySet()) { @@ -618,12 +636,15 @@ public class ClusterStateVerifier { long timeoutValue = 0; long periodValue = 1000; + Set<String> resourceSet = null; if (args.length > 0) { CommandLine cmd = processCommandLineArgs(args); zkServer = cmd.getOptionValue(zkServerAddress); clusterName = cmd.getOptionValue(cluster); String timeoutStr = cmd.getOptionValue(timeout); String periodStr = cmd.getOptionValue(period); + String resourceStr = cmd.getOptionValue(resources); + if (timeoutStr != null) { try { timeoutValue = Long.parseLong(timeoutStr); @@ -641,12 +662,24 @@ public class ClusterStateVerifier { } } + // Allow specifying resources explicitly + if (resourceStr != null) { + String[] resources = resourceStr.split(resourceStr); + resourceSet = Sets.newHashSet(resources); + } + } // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName), // timeoutValue, // periodValue); - return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName), timeoutValue); + ZkVerifier verifier; + if (resourceSet == null) { + verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName); + } else { + verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet); + } + return verifyByZkCallback(verifier, timeoutValue); } public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/helix/blob/977660d0/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java new file mode 100644 index 0000000..ae154d3 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java @@ -0,0 +1,128 @@ +package org.apache.helix.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Arrays; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; + +public class TestClusterStateVerifier extends ZkUnitTestBase { + final String[] RESOURCES = { + "resource0", "resource1" + }; + private HelixAdmin _admin; + private MockParticipantManager[] _participants; + private ClusterControllerManager _controller; + private String _clusterName; + + @BeforeMethod + public void beforeMethod() throws InterruptedException { + final int NUM_PARTITIONS = 1; + final int NUM_REPLICAS = 1; + + // Cluster and resource setup + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + _clusterName = className + "_" + methodName; + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + _admin = setupTool.getClusterManagementTool(); + setupTool.addCluster(_clusterName, true); + setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS, "OnlineOffline", + RebalanceMode.SEMI_AUTO.toString()); + setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS, "OnlineOffline", + RebalanceMode.SEMI_AUTO.toString()); + + // Configure and start the participants + _participants = new MockParticipantManager[RESOURCES.length]; + for (int i = 0; i < _participants.length; i++) { + String host = "localhost"; + int port = 12918 + i; + String id = host + '_' + port; + setupTool.addInstanceToCluster(_clusterName, id); + _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, id); + _participants[i].syncStart(); + } + + // Rebalance the resources + for (int i = 0; i < RESOURCES.length; i++) { + IdealState idealState = _admin.getResourceIdealState(_clusterName, RESOURCES[i]); + idealState.setReplicas(Integer.toString(NUM_REPLICAS)); + idealState.getRecord().setListField(RESOURCES[i] + "_0", + Arrays.asList(_participants[i].getInstanceName())); + _admin.setResourceIdealState(_clusterName, RESOURCES[i], idealState); + } + + // Start the controller + _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0"); + _controller.syncStart(); + Thread.sleep(1000); + } + + @AfterMethod + public void afterMethod() { + // Cleanup + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _admin.dropCluster(_clusterName); + } + + @Test + public void testEntireCluster() { + // Just ensure that the entire cluster passes + // ensure that the external view coalesces + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _clusterName)); + Assert.assertTrue(result); + } + + @Test + public void testResourceSubset() throws InterruptedException { + // Ensure that this passes even when one resource is down + _admin.enableInstance(_clusterName, "localhost_12918", false); + Thread.sleep(1000); + _admin.enableCluster(_clusterName, false); + _admin.enableInstance(_clusterName, "localhost_12918", true); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _clusterName, null, Sets.newHashSet(RESOURCES[1]))); + Assert.assertTrue(result); + + // But the full cluster verification should fail + boolean fullResult = new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName).verify(); + Assert.assertFalse(fullResult); + _admin.enableCluster(_clusterName, true); + } +}
