Repository: helix Updated Branches: refs/heads/master ef6e9bbe0 -> c8a644f4b
[HELIX-381] ClusterStateVerifier can now work with a subset of resources Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c8a644f4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c8a644f4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c8a644f4 Branch: refs/heads/master Commit: c8a644f4b72871de0186cb85338ddd6c39929a51 Parents: ef6e9bb Author: Kanak Biscuitwala <[email protected]> Authored: Tue Feb 18 13:41:16 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Feb 19 17:32:26 2014 -0800 ---------------------------------------------------------------------- .../helix/tools/ClusterStateVerifier.java | 45 ++++++- .../helix/tools/TestClusterStateVerifier.java | 128 +++++++++++++++++++ 2 files changed, 169 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c8a644f4/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index 1563769..da9d76e 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -53,10 +54,9 @@ import org.apache.helix.api.id.ResourceId; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; import org.apache.helix.controller.stages.AttributeName; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.BestPossibleStateOutput; +import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -69,12 +69,15 @@ import org.apache.helix.model.builder.ResourceAssignmentBuilder; import org.apache.helix.util.ZKClientPool; import org.apache.log4j.Logger; +import com.google.common.collect.Sets; + public class ClusterStateVerifier { public static String cluster = "cluster"; public static String zkServerAddress = "zkSvr"; public static String help = "help"; public static String timeout = "timeout"; public static String period = "period"; + public static String resources = "resources"; private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class); @@ -136,6 +139,7 @@ public class ClusterStateVerifier { private final String clusterName; private final Map<String, Map<String, String>> errStates; private final ZkClient zkClient; + private final Set<String> resources; public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) { this(zkAddr, clusterName, null); @@ -143,6 +147,11 @@ public class ClusterStateVerifier { public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName, Map<String, Map<String, String>> errStates) { + this(zkAddr, clusterName, errStates, null); + } + + public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName, + Map<String, Map<String, String>> errStates, Set<String> resources) { if (zkAddr == null || clusterName == null) { throw new IllegalArgumentException("requires zkAddr|clusterName"); } @@ -150,6 +159,7 @@ public class ClusterStateVerifier { this.clusterName = clusterName; this.errStates = errStates; this.zkClient = ZKClientPool.getZkClient(zkAddr); // null; + this.resources = resources; } @Override @@ -158,7 +168,8 @@ public class ClusterStateVerifier { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName); + return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName, + resources); } catch (Exception e) { LOG.error("exception in verification", e); } @@ -225,6 +236,11 @@ public class ClusterStateVerifier { static boolean verifyBestPossAndExtView(HelixDataAccessor accessor, Map<String, Map<String, String>> errStates, String clusterName) { + return verifyBestPossAndExtView(accessor, errStates, clusterName, null); + } + + static boolean verifyBestPossAndExtView(HelixDataAccessor accessor, + Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) { try { Builder keyBuilder = accessor.keyBuilder(); @@ -239,6 +255,12 @@ public class ClusterStateVerifier { extViews = Collections.emptyMap(); } + // Filter resources if requested + if (resources != null && !resources.isEmpty()) { + idealStates.keySet().retainAll(resources); + extViews.keySet().retainAll(resources); + } + // if externalView is not empty and idealState doesn't exist // add empty idealState for the resource for (String resource : extViews.keySet()) { @@ -624,12 +646,15 @@ public class ClusterStateVerifier { long timeoutValue = 0; long periodValue = 1000; + Set<String> resourceSet = null; if (args.length > 0) { CommandLine cmd = processCommandLineArgs(args); zkServer = cmd.getOptionValue(zkServerAddress); clusterName = cmd.getOptionValue(cluster); String timeoutStr = cmd.getOptionValue(timeout); String periodStr = cmd.getOptionValue(period); + String resourceStr = cmd.getOptionValue(resources); + if (timeoutStr != null) { try { timeoutValue = Long.parseLong(timeoutStr); @@ -647,12 +672,24 @@ public class ClusterStateVerifier { } } + // Allow specifying resources explicitly + if (resourceStr != null) { + String[] resources = resourceStr.split(resourceStr); + resourceSet = Sets.newHashSet(resources); + } + } // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName), // timeoutValue, // periodValue); - return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName), timeoutValue); + ZkVerifier verifier; + if (resourceSet == null) { + verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName); + } else { + verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet); + } + return verifyByZkCallback(verifier, timeoutValue); } public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/helix/blob/c8a644f4/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java new file mode 100644 index 0000000..9276dcd --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterStateVerifier.java @@ -0,0 +1,128 @@ +package org.apache.helix.tools; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Arrays; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; + +public class TestClusterStateVerifier extends ZkUnitTestBase { + final String[] RESOURCES = { + "resource0", "resource1" + }; + private HelixAdmin _admin; + private MockParticipantManager[] _participants; + private ClusterControllerManager _controller; + private String _clusterName; + + @BeforeMethod + public void beforeMethod() throws InterruptedException { + final int NUM_PARTITIONS = 1; + final int NUM_REPLICAS = 1; + + // Cluster and resource setup + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + _clusterName = className + "_" + methodName; + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + _admin = setupTool.getClusterManagementTool(); + setupTool.addCluster(_clusterName, true); + setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS, "OnlineOffline", + RebalanceMode.SEMI_AUTO.toString()); + setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS, "OnlineOffline", + RebalanceMode.SEMI_AUTO.toString()); + + // Configure and start the participants + _participants = new MockParticipantManager[RESOURCES.length]; + for (int i = 0; i < _participants.length; i++) { + String host = "localhost"; + int port = 12918 + i; + String id = host + '_' + port; + setupTool.addInstanceToCluster(_clusterName, id); + _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, id); + _participants[i].syncStart(); + } + + // Rebalance the resources + for (int i = 0; i < RESOURCES.length; i++) { + IdealState idealState = _admin.getResourceIdealState(_clusterName, RESOURCES[i]); + idealState.setReplicas(Integer.toString(NUM_REPLICAS)); + idealState.setPreferenceList(RESOURCES[i] + "_0", + Arrays.asList(_participants[i].getInstanceName())); + _admin.setResourceIdealState(_clusterName, RESOURCES[i], idealState); + } + + // Start the controller + _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0"); + _controller.syncStart(); + Thread.sleep(1000); + } + + @AfterMethod + public void afterMethod() { + // Cleanup + _controller.syncStop(); + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + } + _admin.dropCluster(_clusterName); + } + + @Test + public void testEntireCluster() { + // Just ensure that the entire cluster passes + // ensure that the external view coalesces + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _clusterName)); + Assert.assertTrue(result); + } + + @Test + public void testResourceSubset() throws InterruptedException { + // Ensure that this passes even when one resource is down + _admin.enableInstance(_clusterName, "localhost_12918", false); + Thread.sleep(1000); + _admin.enableCluster(_clusterName, false); + _admin.enableInstance(_clusterName, "localhost_12918", true); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + _clusterName, null, Sets.newHashSet(RESOURCES[1]))); + Assert.assertTrue(result); + + // But the full cluster verification should fail + boolean fullResult = new BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName).verify(); + Assert.assertFalse(fullResult); + _admin.enableCluster(_clusterName, true); + } +}
