http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java index 0000000,6572879..6472389 mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java @@@ -1,0 -1,459 +1,459 @@@ + package org.apache.helix.integration.messaging; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.lang.reflect.Field; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Date; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + import org.apache.helix.ControllerChangeListener; + import org.apache.helix.ExternalViewChangeListener; + import org.apache.helix.HelixAdmin; + import org.apache.helix.HelixDataAccessor; + import org.apache.helix.HelixManager; + import org.apache.helix.IdealStateChangeListener; + import org.apache.helix.InstanceConfigChangeListener; + import org.apache.helix.InstanceType; + import org.apache.helix.LiveInstanceChangeListener; + import org.apache.helix.NotificationContext; + import org.apache.helix.PropertyKey.Builder; + import org.apache.helix.TestHelper; + import org.apache.helix.ZNRecord; + import org.apache.helix.controller.HelixControllerMain; + import org.apache.helix.integration.common.ZkIntegrationTestBase; + import org.apache.helix.manager.zk.ZKHelixAdmin; + import org.apache.helix.manager.zk.ZKHelixDataAccessor; + import org.apache.helix.manager.zk.ZKHelixManager; + import org.apache.helix.manager.zk.ZkBaseDataAccessor; + import org.apache.helix.model.ClusterConstraints; + import org.apache.helix.model.ExternalView; + import org.apache.helix.model.IdealState; + import org.apache.helix.model.InstanceConfig; + import org.apache.helix.model.LiveInstance; + import org.apache.helix.model.Message; + import org.apache.helix.model.StateModelDefinition; + import org.apache.helix.model.builder.ConstraintItemBuilder; + import org.apache.helix.participant.StateMachineEngine; + import org.apache.helix.participant.statemachine.StateModel; + import org.apache.helix.participant.statemachine.StateModelFactory; + import org.apache.helix.participant.statemachine.StateModelInfo; + import org.apache.helix.participant.statemachine.Transition; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier; + import org.apache.log4j.Logger; + import org.testng.Assert; + import org.testng.annotations.Test; + + // test case from Ming Fang + public class TestMessageThrottle2 extends ZkIntegrationTestBase { + final static String clusterName = "TestMessageThrottle2"; + final static String resourceName = "MyResource"; + + @Test + public void test() throws Exception { + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + startAdmin(); + startController(); + + // start node2 first + Node.main(new String[] { + "2" + }); + + // wait for node2 becoming MASTER + final Builder keyBuilder = new Builder(clusterName); + final HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + TestHelper.verify(new TestHelper.Verifier() { + + @Override + public boolean verify() throws Exception { + ExternalView view = accessor.getProperty(keyBuilder.externalView(resourceName)); + String state = null; + + if (view != null) { + Map<String, String> map = view.getStateMap(resourceName); + if (map != null) { + state = map.get("node2"); + } + } + return state != null && state.equals("MASTER"); + } + }, 10 * 1000); + + // start node 1 + Node.main(new String[] { + "1" + }); + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + void startController() throws Exception { + // start helixController + System.out.println(String.format("Starting Controller{Cluster:%s, Port:%s, Zookeeper:%s}", + clusterName, 12000, ZK_ADDR)); + HelixManager helixController = + HelixControllerMain.startHelixController(ZK_ADDR, clusterName, "localhost_" + 12000, + HelixControllerMain.STANDALONE); + + StatusPrinter statusPrinter = new StatusPrinter(); + statusPrinter.registerWith(helixController); + } + + void startAdmin() throws Exception { + HelixAdmin admin = new ZKHelixAdmin(ZK_ADDR); + + // create cluster + System.out.println("Creating cluster: " + clusterName); + admin.addCluster(clusterName, true); + + // add MasterSlave state mode definition + admin.addStateModelDef(clusterName, "MasterSlave", new StateModelDefinition( + generateConfigForMasterSlave())); + + // ideal-state znrecord + ZNRecord record = new ZNRecord(resourceName); + record.setSimpleField("IDEAL_STATE_MODE", "AUTO"); + record.setSimpleField("NUM_PARTITIONS", "1"); + record.setSimpleField("REPLICAS", "2"); + record.setSimpleField("STATE_MODEL_DEF_REF", "MasterSlave"); + record.setListField(resourceName, Arrays.asList("node1", "node2")); + + admin.setResourceIdealState(clusterName, resourceName, new IdealState(record)); + + ConstraintItemBuilder builder = new ConstraintItemBuilder(); + + // limit one transition message at a time across the entire cluster + builder.addConstraintAttribute("MESSAGE_TYPE", "STATE_TRANSITION") + // .addConstraintAttribute("INSTANCE", ".*") // un-comment this line if using instance-level + // constraint + .addConstraintAttribute("CONSTRAINT_VALUE", "1"); + admin.setConstraint(clusterName, ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT, + "constraint1", builder.build()); + } + + ZNRecord generateConfigForMasterSlave() { + ZNRecord record = new ZNRecord("MasterSlave"); + record.setSimpleField( + StateModelDefinition.StateModelDefinitionProperty.INITIAL_STATE.toString(), "OFFLINE"); + List<String> statePriorityList = new ArrayList<String>(); + statePriorityList.add("MASTER"); + statePriorityList.add("SLAVE"); + statePriorityList.add("OFFLINE"); + statePriorityList.add("DROPPED"); + statePriorityList.add("ERROR"); + record.setListField( + StateModelDefinition.StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), + statePriorityList); + for (String state : statePriorityList) { + String key = state + ".meta"; + Map<String, String> metadata = new HashMap<String, String>(); + if (state.equals("MASTER")) { + metadata.put("count", "1"); + record.setMapField(key, metadata); + } else if (state.equals("SLAVE")) { + metadata.put("count", "R"); + record.setMapField(key, metadata); + } else if (state.equals("OFFLINE")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("DROPPED")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("ERROR")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } + } + for (String state : statePriorityList) { + String key = state + ".next"; + if (state.equals("MASTER")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("SLAVE", "SLAVE"); + metadata.put("OFFLINE", "SLAVE"); + metadata.put("DROPPED", "SLAVE"); + record.setMapField(key, metadata); + } else if (state.equals("SLAVE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("MASTER", "MASTER"); + metadata.put("OFFLINE", "OFFLINE"); + metadata.put("DROPPED", "OFFLINE"); + record.setMapField(key, metadata); + } else if (state.equals("OFFLINE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("SLAVE", "SLAVE"); + metadata.put("MASTER", "SLAVE"); + metadata.put("DROPPED", "DROPPED"); + record.setMapField(key, metadata); + } else if (state.equals("ERROR")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("OFFLINE", "OFFLINE"); + record.setMapField(key, metadata); + } + } + + // change the transition priority list + List<String> stateTransitionPriorityList = new ArrayList<String>(); + stateTransitionPriorityList.add("SLAVE-MASTER"); + stateTransitionPriorityList.add("OFFLINE-SLAVE"); + stateTransitionPriorityList.add("MASTER-SLAVE"); + stateTransitionPriorityList.add("SLAVE-OFFLINE"); + stateTransitionPriorityList.add("OFFLINE-DROPPED"); + record.setListField( + StateModelDefinition.StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(), + stateTransitionPriorityList); + return record; + // ZNRecordSerializer serializer = new ZNRecordSerializer(); + // System.out.println(new String(serializer.serialize(record))); + } + + static final class MyProcess { + private final String instanceName; + private HelixManager helixManager; + + public MyProcess(String instanceName) { + this.instanceName = instanceName; + } + + public void start() throws Exception { + helixManager = + new ZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, ZK_ADDR); + { + // hack to set sessionTimeout + Field sessionTimeout = ZKHelixManager.class.getDeclaredField("_sessionTimeout"); + sessionTimeout.setAccessible(true); + sessionTimeout.setInt(helixManager, 1000); + } + + StateMachineEngine stateMach = helixManager.getStateMachineEngine(); + stateMach.registerStateModelFactory("MasterSlave", new MyStateModelFactory(helixManager)); + helixManager.connect(); + + StatusPrinter statusPrinter = new StatusPrinter(); + statusPrinter.registerWith(helixManager); + } + + public void stop() { + helixManager.disconnect(); + } + } + + @StateModelInfo(initialState = "OFFLINE", states = { + "MASTER", "SLAVE", "ERROR" + }) + public static class MyStateModel extends StateModel { + private static final Logger LOGGER = Logger.getLogger(MyStateModel.class); + + private final HelixManager helixManager; + + public MyStateModel(HelixManager helixManager) { + this.helixManager = helixManager; + } + + @Transition(to = "SLAVE", from = "OFFLINE") + public void onBecomeSlaveFromOffline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + String instanceName = message.getTgtName(); + LOGGER.info(instanceName + " becomes SLAVE from OFFLINE for " + partitionName); + } + + @Transition(to = "SLAVE", from = "MASTER") + public void onBecomeSlaveFromMaster(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + String instanceName = message.getTgtName(); + LOGGER.info(instanceName + " becomes SLAVE from MASTER for " + partitionName); + } + + @Transition(to = "MASTER", from = "SLAVE") + public void onBecomeMasterFromSlave(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + String instanceName = message.getTgtName(); + LOGGER.info(instanceName + " becomes MASTER from SLAVE for " + partitionName); + } + + @Transition(to = "OFFLINE", from = "SLAVE") + public void onBecomeOfflineFromSlave(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + String instanceName = message.getTgtName(); + LOGGER.info(instanceName + " becomes OFFLINE from SLAVE for " + partitionName); + } + + @Transition(to = "DROPPED", from = "OFFLINE") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + String instanceName = message.getTgtName(); + LOGGER.info(instanceName + " becomes DROPPED from OFFLINE for " + partitionName); + } + + @Transition(to = "OFFLINE", from = "ERROR") + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + String instanceName = message.getTgtName(); + LOGGER.info(instanceName + " becomes OFFLINE from ERROR for " + partitionName); + } + } + + static class MyStateModelFactory extends StateModelFactory<MyStateModel> { + private final HelixManager helixManager; + + public MyStateModelFactory(HelixManager helixManager) { + this.helixManager = helixManager; + } + + @Override + public MyStateModel createNewStateModel(String resource, String partitionName) { + return new MyStateModel(helixManager); + } + } + + static class Node { + // ------------------------------ FIELDS ------------------------------ + + private static final Logger LOGGER = Logger.getLogger(Node.class); + + // -------------------------- INNER CLASSES -------------------------- + + // --------------------------- main() method --------------------------- + + public static void main(String[] args) throws Exception { + if (args.length < 1) { + LOGGER.info("usage: id"); + System.exit(0); + } + int id = Integer.parseInt(args[0]); + String instanceName = "node" + id; + + addInstanceConfig(instanceName); + startProcess(instanceName); + } + + private static void addInstanceConfig(String instanceName) { + // add node to cluster if not already added + ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR); + + InstanceConfig instanceConfig = null; + try { + instanceConfig = admin.getInstanceConfig(clusterName, instanceName); + } catch (Exception e) { + } + if (instanceConfig == null) { + InstanceConfig config = new InstanceConfig(instanceName); + config.setHostName("localhost"); + config.setInstanceEnabled(true); + echo("Adding InstanceConfig:" + config); + admin.addInstance(clusterName, config); + } + } + + public static void echo(Object obj) { + LOGGER.info(obj); + } + + private static void startProcess(String instanceName) throws Exception { + MyProcess process = new MyProcess(instanceName); + process.start(); + } + } + + static class StatusPrinter implements IdealStateChangeListener, InstanceConfigChangeListener, + ExternalViewChangeListener, LiveInstanceChangeListener, ControllerChangeListener { + // ------------------------------ FIELDS ------------------------------ + + private HelixManager helixManager; + + // ------------------------ INTERFACE METHODS ------------------------ + + // --------------------- Interface ControllerChangeListener + // --------------------- + + @Override + public void onControllerChange(NotificationContext changeContext) { + System.out.println("StatusPrinter.onControllerChange:" + changeContext); + } + + // --------------------- Interface ExternalViewChangeListener + // --------------------- + + @Override + public void onExternalViewChange(List<ExternalView> externalViewList, + NotificationContext changeContext) { + for (ExternalView externalView : externalViewList) { + System.out + .println("StatusPrinter.onExternalViewChange:" + "externalView = " + externalView); + } + } + + // --------------------- Interface IdealStateChangeListener + // --------------------- + + @Override + public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext) { + for (IdealState state : idealState) { + System.out.println("StatusPrinter.onIdealStateChange:" + "state = " + state); + } + } + + // --------------------- Interface InstanceConfigChangeListener + // --------------------- + + @Override + public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, + NotificationContext context) { + for (InstanceConfig instanceConfig : instanceConfigs) { + System.out.println("StatusPrinter.onInstanceConfigChange:" + "instanceConfig = " + + instanceConfig); + } + } + + // --------------------- Interface LiveInstanceChangeListener + // --------------------- + + @Override + public void onLiveInstanceChange(List<LiveInstance> liveInstances, + NotificationContext changeContext) { + for (LiveInstance liveInstance : liveInstances) { + System.out + .println("StatusPrinter.onLiveInstanceChange:" + "liveInstance = " + liveInstance); + } + } + + // -------------------------- OTHER METHODS -------------------------- + + public void registerWith(HelixManager helixManager) throws Exception { + this.helixManager = helixManager; + helixManager.addIdealStateChangeListener(this); + helixManager.addInstanceConfigChangeListener(this); + helixManager.addExternalViewChangeListener(this); + helixManager.addLiveInstanceChangeListener(this); + helixManager.addControllerListener(this); + } + } + }
http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java index 0000000,1cc58bf..19f2bf4 mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java @@@ -1,0 -1,114 +1,114 @@@ + package org.apache.helix.integration.paticipant; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.util.Date; + + import org.apache.helix.TestHelper; + import org.apache.helix.integration.common.ZkIntegrationTestBase; + import org.apache.helix.integration.manager.ClusterControllerManager; + import org.apache.helix.integration.manager.MockParticipantManager; + import org.apache.helix.mock.participant.MockBootstrapModelFactory; + import org.apache.helix.participant.StateMachineEngine; + import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier; + import org.apache.log4j.Logger; + import org.testng.Assert; + import org.testng.annotations.Test; + + public class TestNonOfflineInitState extends ZkIntegrationTestBase { + private static Logger LOG = Logger.getLogger(TestNonOfflineInitState.class); + + @Test + public void testNonOfflineInitState() throws Exception { + System.out.println("START testNonOfflineInitState at " + new Date(System.currentTimeMillis())); + String clusterName = getShortClassName(); + + setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 10, // partitions per resource + 5, // number of nodes + 1, // replicas + "Bootstrap", true); // do rebalance + + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[5]; + for (int i = 0; i < 5; i++) { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + + // add a state model with non-OFFLINE initial state + StateMachineEngine stateMach = participants[i].getStateMachineEngine(); + MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory(); + stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory); + + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + + System.out.println("END testNonOfflineInitState at " + new Date(System.currentTimeMillis())); + } + + private static void setupCluster(String clusterName, String ZkAddr, int startPort, + String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, + int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception { + if (_gZkClient.exists("/" + clusterName)) { + LOG.warn("Cluster already exists:" + clusterName + ". Deleting it"); + _gZkClient.deleteRecursive("/" + clusterName); + } + + ClusterSetup setupTool = new ClusterSetup(ZkAddr); + setupTool.addCluster(clusterName, true); + setupTool.addStateModelDef(clusterName, "Bootstrap", + TestHelper.generateStateModelDefForBootstrap()); + + for (int i = 0; i < nodesNb; i++) { + int port = startPort + i; + setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port); + } + + for (int i = 0; i < resourceNb; i++) { + String dbName = resourceNamePrefix + i; + setupTool.addResourceToCluster(clusterName, dbName, partitionNb, stateModelDef); + if (doRebalance) { + setupTool.rebalanceStorageCluster(clusterName, dbName, replica); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java index 0000000,95b78e5..7945c4b mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java @@@ -1,0 -1,99 +1,99 @@@ + package org.apache.helix.integration.paticipant; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.util.UUID; + + import org.apache.helix.Criteria; + import org.apache.helix.InstanceType; + import org.apache.helix.PropertyKey.Builder; + import org.apache.helix.integration.common.ZkStandAloneCMTestBase; + import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory; + import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError; + import org.apache.helix.model.ExternalView; + import org.apache.helix.model.Message; + import org.apache.helix.model.Message.MessageType; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier; + import org.apache.log4j.Logger; + import org.testng.Assert; + import org.testng.annotations.Test; + + public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase { + private static Logger LOG = Logger.getLogger(TestParticipantErrorMessage.class); + + @Test() + public void TestParticipantErrorMessageSend() { + String participant1 = "localhost_" + START_PORT; + String participant2 = "localhost_" + (START_PORT + 1); + + Message errorMessage1 = + new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString()); + errorMessage1.setTgtSessionId("*"); + errorMessage1.getRecord().setSimpleField( + DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, + ActionOnError.DISABLE_INSTANCE.toString()); + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER); + recipientCriteria.setSessionSpecific(false); + _participants[0].getMessagingService().send(recipientCriteria, + errorMessage1); + + Message errorMessage2 = + new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString()); + errorMessage2.setTgtSessionId("*"); + errorMessage2.setResourceName("TestDB"); + errorMessage2.setPartitionName("TestDB_14"); + errorMessage2.getRecord().setSimpleField( + DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, + ActionOnError.DISABLE_PARTITION.toString()); + Criteria recipientCriteria2 = new Criteria(); + recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER); + recipientCriteria2.setSessionSpecific(false); + _participants[1].getMessagingService().send(recipientCriteria2, + errorMessage2); + + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + LOG.error("Interrupted sleep", e); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + Builder kb = _participants[1].getHelixDataAccessor().keyBuilder(); + ExternalView externalView = + _participants[1].getHelixDataAccessor().getProperty( + kb.externalView("TestDB")); + + for (String partitionName : externalView.getRecord().getMapFields().keySet()) { + for (String hostName : externalView.getRecord().getMapField(partitionName).keySet()) { + if (hostName.equals(participant1)) { + Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName) + .equalsIgnoreCase("OFFLINE")); + } + } + } + Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2) + .equalsIgnoreCase("OFFLINE")); + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java index 0000000,1fe742e..da8a4bc mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java @@@ -1,0 -1,118 +1,118 @@@ + package org.apache.helix.integration.paticipant; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.util.Date; + import java.util.concurrent.atomic.AtomicReference; + + import org.apache.helix.NotificationContext; + import org.apache.helix.TestHelper; + import org.apache.helix.integration.common.ZkIntegrationTestBase; + import org.apache.helix.integration.manager.ClusterControllerManager; + import org.apache.helix.integration.manager.MockParticipantManager; + import org.apache.helix.mock.participant.MockTransition; + import org.apache.helix.model.Message; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier; + import org.testng.Assert; + import org.testng.annotations.Test; + + public class TestRestartParticipant extends ZkIntegrationTestBase { + public class KillOtherTransition extends MockTransition { + final AtomicReference<MockParticipantManager> _other; + + public KillOtherTransition(MockParticipantManager other) { + _other = new AtomicReference<MockParticipantManager>(other); + } + + @Override + public void doTransition(Message message, NotificationContext context) { + MockParticipantManager other = _other.getAndSet(null); + if (other != null) { + System.err.println("Kill " + other.getInstanceName() + + ". Interrupted exceptions are IGNORABLE"); + other.syncStop(); + } + } + } + + @Test() + public void testRestartParticipant() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis())); + + String clusterName = getShortClassName(); + MockParticipantManager[] participants = new MockParticipantManager[5]; + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 10, // partitions per resource + 5, // number of nodes + 3, // replicas + "MasterSlave", true); // do rebalance + + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // start participants + for (int i = 0; i < 5; i++) { + String instanceName = "localhost_" + (12918 + i); + + if (i == 4) { + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].setTransition(new KillOtherTransition(participants[0])); + } else { + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + } + + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // restart + Thread.sleep(500); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(), + participants[0].getInstanceName()); + System.err.println("Restart " + participant.getInstanceName()); + participant.syncStart(); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // clean up + controller.syncStop(); + for (int i = 0; i < 5; i++) { + participants[i].syncStop(); + } + participant.syncStop(); + + System.out.println("START testRestartParticipant at " + new Date(System.currentTimeMillis())); + + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java index 0000000,bd01067..772fcac mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java @@@ -1,0 -1,184 +1,184 @@@ + package org.apache.helix.integration.paticipant; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.util.Collection; + import java.util.Date; + import java.util.HashMap; + import java.util.HashSet; + import java.util.Map; + import java.util.Set; + + import org.apache.helix.HelixDataAccessor; + import org.apache.helix.NotificationContext; + import org.apache.helix.PropertyKey.Builder; + import org.apache.helix.integration.common.ZkStandAloneCMTestBase; + import org.apache.helix.integration.manager.ClusterControllerManager; + import org.apache.helix.integration.manager.MockParticipantManager; + import org.apache.helix.messaging.handling.MessageHandler.ErrorCode; + import org.apache.helix.mock.participant.MockMSStateModel; + import org.apache.helix.mock.participant.MockTransition; + import org.apache.helix.mock.participant.SleepTransition; + import org.apache.helix.model.ExternalView; + import org.apache.helix.model.IdealState; + import org.apache.helix.model.Message; + import org.apache.helix.participant.statemachine.StateModelFactory; + import org.apache.helix.participant.statemachine.StateModelInfo; + import org.apache.helix.participant.statemachine.StateTransitionError; + import org.apache.helix.participant.statemachine.Transition; + import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.MasterNbInExtViewVerifier; + import org.apache.log4j.Logger; + import org.testng.Assert; + import org.testng.annotations.BeforeClass; + import org.testng.annotations.Test; + + public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { + private static Logger LOG = Logger.getLogger(TestStateTransitionTimeout.class); + + @Override + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(ZK_ADDR); + + // setup storage cluster + _setupTool.addCluster(CLUSTER_NAME, true); + _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL); + + for (int i = 0; i < NODE_NR; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3); + + // Set the timeout values + IdealState idealState = + _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + Message.Attributes.TIMEOUT; + idealState.getRecord().setSimpleField(stateTransition, "300"); + + String command = + "-zkSvr " + ZK_ADDR + " -addResourceProperty " + CLUSTER_NAME + " " + TEST_DB + " " + + stateTransition + " 200"; + ClusterSetup.processCommandLineArgs(command.split(" ")); + } + + @StateModelInfo(initialState = "OFFLINE", states = { + "MASTER", "SLAVE", "ERROR" + }) + public static class TimeOutStateModel extends MockMSStateModel { + boolean _sleep = false; + StateTransitionError _error; + int _errorCallcount = 0; + + public TimeOutStateModel(MockTransition transition, boolean sleep) { + super(transition); + _sleep = sleep; + } + + @Transition(to = "MASTER", from = "SLAVE") + public void onBecomeMasterFromSlave(Message message, NotificationContext context) + throws InterruptedException { + LOG.info("Become MASTER from SLAVE"); + if (_transition != null && _sleep) { + _transition.doTransition(message, context); + } + } + + @Override + public void rollbackOnError(Message message, NotificationContext context, + StateTransitionError error) { + _error = error; + _errorCallcount++; + } + } + + public static class SleepStateModelFactory extends StateModelFactory<TimeOutStateModel> { + Set<String> partitionsToSleep = new HashSet<String>(); + int _sleepTime; + + public SleepStateModelFactory(int sleepTime) { + _sleepTime = sleepTime; + } + + public void setPartitions(Collection<String> partitions) { + partitionsToSleep.addAll(partitions); + } + + public void addPartition(String partition) { + partitionsToSleep.add(partition); + } + + @Override + public TimeOutStateModel createNewStateModel(String resource, String stateUnitKey) { + return new TimeOutStateModel(new SleepTransition(_sleepTime), + partitionsToSleep.contains(stateUnitKey)); + } + } + + @Test + public void testStateTransitionTimeOut() throws Exception { + Map<String, SleepStateModelFactory> factories = new HashMap<String, SleepStateModelFactory>(); + IdealState idealState = + _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + for (int i = 0; i < NODE_NR; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + SleepStateModelFactory factory = new SleepStateModelFactory(1000); + factories.put(instanceName, factory); + for (String p : idealState.getPartitionSet()) { + if (idealState.getPreferenceList(p).get(0).equals(instanceName)) { + factory.addPartition(p); + } + } + + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", factory); + _participants[i].syncStart(); + } + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = + new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + HelixDataAccessor accessor = _participants[0].getHelixDataAccessor(); + + Builder kb = accessor.keyBuilder(); + ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB)); + for (String p : idealState.getPartitionSet()) { + String idealMaster = idealState.getPreferenceList(p).get(0); + Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR")); + + TimeOutStateModel model = factories.get(idealMaster).getStateModel(TEST_DB, p); + Assert.assertEquals(model._errorCallcount, 1); + Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT); + } + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java index 0000000,ec73252..35d57c4 mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java @@@ -1,0 -1,234 +1,234 @@@ + package org.apache.helix.integration.rebalancer; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.util.Date; + import java.util.HashMap; + import java.util.Map; + import org.apache.helix.HelixDataAccessor; + import org.apache.helix.HelixManager; + import org.apache.helix.PropertyKey.Builder; + import org.apache.helix.ZNRecord; + import org.apache.helix.controller.stages.ClusterDataCache; + import org.apache.helix.integration.common.ZkStandAloneCMTestBase; + import org.apache.helix.integration.manager.ClusterControllerManager; + import org.apache.helix.integration.manager.MockParticipantManager; + import org.apache.helix.manager.zk.ZKHelixDataAccessor; + import org.apache.helix.manager.zk.ZkBaseDataAccessor; + import org.apache.helix.manager.zk.ZkClient; + import org.apache.helix.model.ExternalView; + import org.apache.helix.model.IdealState.RebalanceMode; + import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.ZkVerifier; + import org.apache.log4j.Logger; + import org.testng.Assert; + import org.testng.annotations.BeforeClass; + import org.testng.annotations.Test; + + public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase { + private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class + .getName()); + + @Override + @BeforeClass + public void beforeClass() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + _setupTool = new ClusterSetup(ZK_ADDR); + + // setup storage cluster + _setupTool.addCluster(CLUSTER_NAME, true); + + _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 100, "OnlineOffline", + RebalanceMode.FULL_AUTO + "", 0, 25); + for (int i = 0; i < NODE_NR; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 1); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + HelixManager manager = _controller; // _startCMResultMap.get(controllerName)._manager; + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + // start dummy participants + for (int i = 0; i < NODE_NR; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + Thread.sleep(2000); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient, + CLUSTER_NAME, TEST_DB)); + Assert.assertTrue(result); + ExternalView ev = + manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB)); + System.out.println(ev.getPartitionSet().size()); + if (i < 3) { + Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1)); + } else { + Assert.assertEquals(ev.getPartitionSet().size(), 100); + } + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient, + CLUSTER_NAME, TEST_DB)); + + Assert.assertTrue(result); + } + + @Test() + public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception { + HelixManager manager = _controller; + // kill 1 node + _participants[0].syncStop(); + + // verifyBalanceExternalView(); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient, + CLUSTER_NAME, TEST_DB)); + Assert.assertTrue(result); + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + ExternalView ev = + manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB)); + Assert.assertEquals(ev.getPartitionSet().size(), 100); + + _participants[1].syncStop(); + + // verifyBalanceExternalView(); + result = + ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient, + CLUSTER_NAME, TEST_DB)); + Assert.assertTrue(result); + ev = manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB)); + Assert.assertEquals(ev.getPartitionSet().size(), 75); + + // add 2 nodes + for (int i = 0; i < 2; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + String newInstanceName = storageNodeName.replace(':', '_'); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstanceName); + participant.syncStart(); + } + + Thread.sleep(1000); + result = + ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient, + CLUSTER_NAME, TEST_DB)); + Assert.assertTrue(result); + } + + static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, + String masterState, int replica, int instances, int maxPerInstance) { + Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>(); + for (String partitionName : externalView.getMapFields().keySet()) { + Map<String, String> assignmentMap = externalView.getMapField(partitionName); + // Assert.assertTrue(assignmentMap.size() >= replica); + for (String instance : assignmentMap.keySet()) { + if (assignmentMap.get(instance).equals(masterState)) { + if (!masterPartitionsCountMap.containsKey(instance)) { + masterPartitionsCountMap.put(instance, 0); + } + masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1); + } + } + } + + int perInstancePartition = partitionCount / instances; + + int totalCount = 0; + for (String instanceName : masterPartitionsCountMap.keySet()) { + int instancePartitionCount = masterPartitionsCountMap.get(instanceName); + totalCount += instancePartitionCount; + if (!(instancePartitionCount == perInstancePartition + || instancePartitionCount == perInstancePartition + 1 || instancePartitionCount == maxPerInstance)) { + return false; + } + if (instancePartitionCount == maxPerInstance) { + continue; + } + if (instancePartitionCount == perInstancePartition + 1) { + if (partitionCount % instances == 0) { + return false; + } + } + } + if (totalCount == maxPerInstance * instances) { + return true; + } + if (partitionCount != totalCount) { + return false; + } + return true; + + } + + public static class ExternalViewBalancedVerifier implements ZkVerifier { + String _clusterName; + String _resourceName; + + public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) { + _clusterName = clusterName; + _resourceName = resourceName; + } + + @Override + public boolean verify() { + HelixDataAccessor accessor = + new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + Builder keyBuilder = accessor.keyBuilder(); + int numberOfPartitions = + accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields() + .size(); + ClusterDataCache cache = new ClusterDataCache(); + cache.refresh(accessor); + String masterValue = + cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()) + .getStatesPriorityList().get(0); + int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas()); + return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)) + .getRecord(), numberOfPartitions, masterValue, replicas, cache.getLiveInstances().size(), + cache.getIdealState(_resourceName).getMaxPartitionsPerInstance()); + } + + @Override + public ZkClient getZkClient() { + return _gZkClient; + } + + @Override + public String getClusterName() { + return _clusterName; + } + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java index 0000000,9098e8e..af2b5a1 mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java @@@ -1,0 -1,204 +1,204 @@@ + package org.apache.helix.integration.rebalancer; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import com.beust.jcommander.internal.Lists; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import org.apache.helix.HelixDataAccessor; + import org.apache.helix.HelixManager; + import org.apache.helix.PropertyKey.Builder; + import org.apache.helix.ZNRecord; + import org.apache.helix.controller.rebalancer.Rebalancer; + import org.apache.helix.controller.stages.ClusterDataCache; + import org.apache.helix.controller.stages.CurrentStateOutput; + import org.apache.helix.integration.common.ZkStandAloneCMTestBase; + import org.apache.helix.manager.zk.ZKHelixDataAccessor; + import org.apache.helix.manager.zk.ZkBaseDataAccessor; + import org.apache.helix.manager.zk.ZkClient; + import org.apache.helix.model.ExternalView; + import org.apache.helix.model.IdealState; + import org.apache.helix.model.IdealState.IdealStateProperty; + import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.ZkVerifier; + import org.testng.Assert; + import org.testng.annotations.Test; + + public class TestCustomizedIdealStateRebalancer extends ZkStandAloneCMTestBase { + String db2 = TEST_DB + "2"; + static boolean testRebalancerCreated = false; + static boolean testRebalancerInvoked = false; + + public static class TestRebalancer implements Rebalancer { + + @Override + public void init(HelixManager manager) { + testRebalancerCreated = true; + } + + @Override + public IdealState computeNewIdealState(String resourceName, IdealState currentIdealState, + CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) { + testRebalancerInvoked = true; + List<String> liveNodes = Lists.newArrayList(clusterData.getLiveInstances().keySet()); + int i = 0; + for (String partition : currentIdealState.getPartitionSet()) { + int index = i++ % liveNodes.size(); + String instance = liveNodes.get(index); + currentIdealState.getPreferenceList(partition).clear(); + currentIdealState.getPreferenceList(partition).add(instance); + + currentIdealState.getInstanceStateMap(partition).clear(); + currentIdealState.getInstanceStateMap(partition).put(instance, "MASTER"); + } + currentIdealState.setReplicas("1"); + return currentIdealState; + } + } + + @Test + public void testCustomizedIdealStateRebalancer() throws InterruptedException { + _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "MasterSlave"); + _setupTool.addResourceProperty(CLUSTER_NAME, db2, + IdealStateProperty.REBALANCER_CLASS_NAME.toString(), + TestCustomizedIdealStateRebalancer.TestRebalancer.class.getName()); + _setupTool.addResourceProperty(CLUSTER_NAME, db2, IdealStateProperty.REBALANCE_MODE.toString(), + RebalanceMode.USER_DEFINED.toString()); + + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3); + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_gZkClient, + CLUSTER_NAME, db2)); + Assert.assertTrue(result); + Thread.sleep(1000); + HelixDataAccessor accessor = + new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + Builder keyBuilder = accessor.keyBuilder(); + ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2)); + Assert.assertEquals(ev.getPartitionSet().size(), 60); + for (String partition : ev.getPartitionSet()) { + Assert.assertEquals(ev.getStateMap(partition).size(), 1); + } + IdealState is = accessor.getProperty(keyBuilder.idealStates(db2)); + for (String partition : is.getPartitionSet()) { + Assert.assertEquals(is.getPreferenceList(partition).size(), 0); + Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0); + } + Assert.assertTrue(testRebalancerCreated); + Assert.assertTrue(testRebalancerInvoked); + } + + public static class ExternalViewBalancedVerifier implements ZkVerifier { + ZkClient _client; + String _clusterName; + String _resourceName; + + public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName) { + _client = client; + _clusterName = clusterName; + _resourceName = resourceName; + } + + @Override + public boolean verify() { + try { + HelixDataAccessor accessor = + new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client)); + Builder keyBuilder = accessor.keyBuilder(); + int numberOfPartitions = + accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields() + .size(); + ClusterDataCache cache = new ClusterDataCache(); + cache.refresh(accessor); + String masterValue = + cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()) + .getStatesPriorityList().get(0); + int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas()); + String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag(); + int instances = 0; + for (String liveInstanceName : cache.getLiveInstances().keySet()) { + if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) { + instances++; + } + } + if (instances == 0) { + instances = cache.getLiveInstances().size(); + } + return verifyBalanceExternalView( + accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(), + numberOfPartitions, masterValue, replicas, instances); + } catch (Exception e) { + return false; + } + } + + @Override + public ZkClient getZkClient() { + return _client; + } + + @Override + public String getClusterName() { + return _clusterName; + } + + } + + static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, + String masterState, int replica, int instances) { + Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>(); + for (String partitionName : externalView.getMapFields().keySet()) { + Map<String, String> assignmentMap = externalView.getMapField(partitionName); + // Assert.assertTrue(assignmentMap.size() >= replica); + for (String instance : assignmentMap.keySet()) { + if (assignmentMap.get(instance).equals(masterState)) { + if (!masterPartitionsCountMap.containsKey(instance)) { + masterPartitionsCountMap.put(instance, 0); + } + masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1); + } + } + } + + int perInstancePartition = partitionCount / instances; + + int totalCount = 0; + for (String instanceName : masterPartitionsCountMap.keySet()) { + int instancePartitionCount = masterPartitionsCountMap.get(instanceName); + totalCount += instancePartitionCount; + if (!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition + 1)) { + return false; + } + if (instancePartitionCount == perInstancePartition + 1) { + if (partitionCount % instances == 0) { + return false; + } + } + } + if (partitionCount != totalCount) { + return false; + } + return true; + + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java index 0000000,335b99d..813ee69 mode 000000,100644..100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java @@@ -1,0 -1,478 +1,478 @@@ + package org.apache.helix.integration.rebalancer; + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import com.google.common.collect.ImmutableSet; + import com.google.common.collect.Maps; + import com.google.common.collect.Sets; + import java.util.Date; + import java.util.Map; + import java.util.Set; + import org.apache.helix.BaseDataAccessor; + import org.apache.helix.HelixAdmin; + import org.apache.helix.HelixDataAccessor; + import org.apache.helix.PropertyKey; + import org.apache.helix.TestHelper; + import org.apache.helix.TestHelper.Verifier; + import org.apache.helix.ZNRecord; + import org.apache.helix.ZkUnitTestBase; + import org.apache.helix.integration.manager.ClusterControllerManager; + import org.apache.helix.integration.manager.MockParticipantManager; + import org.apache.helix.manager.zk.ZKHelixAdmin; + import org.apache.helix.manager.zk.ZKHelixDataAccessor; + import org.apache.helix.manager.zk.ZkBaseDataAccessor; + import org.apache.helix.manager.zk.ZkClient; + import org.apache.helix.model.ExternalView; + import org.apache.helix.model.IdealState; + import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier; ++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.ZkVerifier; + import org.apache.helix.util.ZKClientPool; + import org.apache.log4j.Logger; + import org.testng.Assert; + import org.testng.annotations.Test; + + /** + * Test that node tagging behaves correctly in FULL_AUTO mode + */ + public class TestFullAutoNodeTagging extends ZkUnitTestBase { + private static final Logger LOG = Logger.getLogger(TestFullAutoNodeTagging.class); + + @Test + public void testUntag() throws Exception { + final int NUM_PARTICIPANTS = 2; + final int NUM_PARTITIONS = 4; + final int NUM_REPLICAS = 1; + final String RESOURCE_NAME = "TestResource0"; + final String TAG = "ASSIGNABLE"; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + final String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestResource", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "OnlineOffline", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging + true); // do rebalance + + // Tag the resource + final HelixAdmin helixAdmin = new ZKHelixAdmin(_gZkClient); + IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME); + idealState.setInstanceGroupTag(TAG); + helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState); + + // Get a data accessor + final HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + final PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // Tag the participants + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + final String instanceName = "localhost_" + (12918 + i); + helixAdmin.addInstanceTag(clusterName, instanceName, TAG); + } + + // Start controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + // Start participants + MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + final String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + // Verify that there are NUM_PARTITIONS partitions in the external view, each having + // NUM_REPLICAS replicas, where all assigned replicas are to tagged nodes, and they are all + // ONLINE. + Verifier v = new Verifier() { + @Override + public boolean verify() throws Exception { + ExternalView externalView = + pollForProperty(ExternalView.class, accessor, keyBuilder.externalView(RESOURCE_NAME), + true); + if (externalView == null) { + return false; + } + Set<String> taggedInstances = + Sets.newHashSet(helixAdmin.getInstancesInClusterWithTag(clusterName, TAG)); + Set<String> partitionSet = externalView.getPartitionSet(); + if (partitionSet.size() != NUM_PARTITIONS) { + return false; + } + for (String partitionName : partitionSet) { + Map<String, String> stateMap = externalView.getStateMap(partitionName); + if (stateMap.size() != NUM_REPLICAS) { + return false; + } + for (String participantName : stateMap.keySet()) { + if (!taggedInstances.contains(participantName)) { + return false; + } + String state = stateMap.get(participantName); + if (!state.equalsIgnoreCase("ONLINE")) { + return false; + } + } + } + return true; + } + }; + + // Run the verifier for both nodes tagged + boolean initialResult = TestHelper.verify(v, 10 * 1000); + Assert.assertTrue(initialResult); + + // Untag a node + helixAdmin.removeInstanceTag(clusterName, "localhost_12918", TAG); + + // Verify again + boolean finalResult = TestHelper.verify(v, 10 * 1000); + Assert.assertTrue(finalResult); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + /** + * Ensure that no assignments happen when there are no tagged nodes, but the resource is tagged + */ + @Test + public void testResourceTaggedFirst() throws Exception { + final int NUM_PARTICIPANTS = 10; + final int NUM_PARTITIONS = 4; + final int NUM_REPLICAS = 2; + final String RESOURCE_NAME = "TestDB0"; + final String TAG = "ASSIGNABLE"; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging + true); // do rebalance + + // tag the resource + HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR); + IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME); + idealState.setInstanceGroupTag(TAG); + helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState); + + // start controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + final String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + Thread.sleep(1000); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new EmptyZkVerifier(clusterName, RESOURCE_NAME)); + Assert.assertTrue(result, "External view and current state must be empty"); + + // cleanup + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + participants[i].syncStop(); + } + controller.syncStop(); + } + + /** + * Basic test for tagging behavior. 10 participants, of which 4 are tagged. Launch all 10, + * checking external view every time a tagged node is started. Then shut down all 10, checking + * external view every time a tagged node is killed. + */ + @Test + public void testSafeAssignment() throws Exception { + final int NUM_PARTICIPANTS = 10; + final int NUM_PARTITIONS = 4; + final int NUM_REPLICAS = 2; + final String RESOURCE_NAME = "TestDB0"; + final String TAG = "ASSIGNABLE"; + + final String[] TAGGED_NODES = { + "localhost_12920", "localhost_12922", "localhost_12924", "localhost_12925" + }; + Set<String> taggedNodes = Sets.newHashSet(TAGGED_NODES); + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging + true); // do rebalance + + // tag the resource and participants + HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR); + for (String taggedNode : TAGGED_NODES) { + helixAdmin.addInstanceTag(clusterName, taggedNode, TAG); + } + IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME); + idealState.setInstanceGroupTag(TAG); + helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState); + + // start controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + final String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + + // ensure that everything is valid if this is a tagged node that is starting + if (taggedNodes.contains(instanceName)) { + // make sure that the best possible matches the external view + Thread.sleep(500); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // make sure that the tagged state of the nodes is still balanced + result = + ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName, + RESOURCE_NAME, TAGGED_NODES, false)); + Assert.assertTrue(result, "initial assignment with all tagged nodes live is invalid"); + } + } + + // cleanup + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + String participantName = participants[i].getInstanceName(); + participants[i].syncStop(); + if (taggedNodes.contains(participantName)) { + // check that the external view is still correct even after removing tagged nodes + taggedNodes.remove(participantName); + Thread.sleep(500); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new TaggedZkVerifier(clusterName, + RESOURCE_NAME, TAGGED_NODES, taggedNodes.isEmpty())); + Assert.assertTrue(result, "incorrect state after removing " + participantName + ", " + + taggedNodes + " remain"); + } + } + controller.syncStop(); + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + /** + * Checker for basic validity of the external view given node tagging requirements + */ + private static class TaggedZkVerifier implements ZkVerifier { + private final String _clusterName; + private final String _resourceName; + private final String[] _taggedNodes; + private final boolean _isEmptyAllowed; + private final ZkClient _zkClient; + + /** + * Create a verifier for a specific cluster and resource + * @param clusterName the cluster to verify + * @param resourceName the resource within the cluster to verify + * @param taggedNodes nodes tagged with the resource tag + * @param isEmptyAllowed true if empty assignments are legal + */ + public TaggedZkVerifier(String clusterName, String resourceName, String[] taggedNodes, + boolean isEmptyAllowed) { + _clusterName = clusterName; + _resourceName = resourceName; + _taggedNodes = taggedNodes; + _isEmptyAllowed = isEmptyAllowed; + _zkClient = ZKClientPool.getZkClient(ZK_ADDR); + } + + @Override + public boolean verify() { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); + HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName)); + + Set<String> taggedNodeSet = ImmutableSet.copyOf(_taggedNodes); + + // set up counts of partitions, masters, and slaves per node + Map<String, Integer> partitionCount = Maps.newHashMap(); + int partitionSum = 0; + Map<String, Integer> masterCount = Maps.newHashMap(); + int masterSum = 0; + Map<String, Integer> slaveCount = Maps.newHashMap(); + int slaveSum = 0; + + for (String partitionName : externalView.getPartitionSet()) { + Map<String, String> stateMap = externalView.getStateMap(partitionName); + for (String participantName : stateMap.keySet()) { + String state = stateMap.get(participantName); + if (state.equalsIgnoreCase("MASTER") || state.equalsIgnoreCase("SLAVE")) { + partitionSum++; + incrementCount(partitionCount, participantName); + if (!taggedNodeSet.contains(participantName)) { + // not allowed to have a non-tagged node assigned + LOG.error("Participant " + participantName + " is not tag, but has an assigned node"); + return false; + } else if (state.equalsIgnoreCase("MASTER")) { + masterSum++; + incrementCount(masterCount, participantName); + } else if (state.equalsIgnoreCase("SLAVE")) { + slaveSum++; + incrementCount(slaveCount, participantName); + } + } + } + } + + // check balance in partitions per node + if (partitionCount.size() > 0) { + boolean partitionMapDividesEvenly = partitionSum % partitionCount.size() == 0; + boolean withinAverage = + withinAverage(partitionCount, _isEmptyAllowed, partitionMapDividesEvenly); + if (!withinAverage) { + LOG.error("partition counts deviate from average"); + return false; + } + } else { + if (!_isEmptyAllowed) { + LOG.error("partition assignments are empty"); + return false; + } + } + + // check balance in masters per node + if (masterCount.size() > 0) { + boolean masterMapDividesEvenly = masterSum % masterCount.size() == 0; + boolean withinAverage = withinAverage(masterCount, _isEmptyAllowed, masterMapDividesEvenly); + if (!withinAverage) { + LOG.error("master counts deviate from average"); + return false; + } + } else { + if (!_isEmptyAllowed) { + LOG.error("master assignments are empty"); + return false; + } + } + + // check balance in slaves per node + if (slaveCount.size() > 0) { + boolean slaveMapDividesEvenly = slaveSum % slaveCount.size() == 0; + boolean withinAverage = withinAverage(slaveCount, true, slaveMapDividesEvenly); + if (!withinAverage) { + LOG.error("slave counts deviate from average"); + return false; + } + } + return true; + } + + private void incrementCount(Map<String, Integer> countMap, String key) { + if (!countMap.containsKey(key)) { + countMap.put(key, 0); + } + countMap.put(key, countMap.get(key) + 1); + } + + private boolean withinAverage(Map<String, Integer> countMap, boolean isEmptyAllowed, + boolean dividesEvenly) { + if (countMap.size() == 0) { + if (!isEmptyAllowed) { + LOG.error("Map not allowed to be empty"); + return false; + } + return true; + } + int upperBound = 1; + if (!dividesEvenly) { + upperBound = 2; + } + int average = computeAverage(countMap); + for (String participantName : countMap.keySet()) { + int count = countMap.get(participantName); + if (count < average - 1 || count > average + upperBound) { + LOG.error("Count " + count + " for " + participantName + " too far from average of " + + average); + return false; + } + } + return true; + } + + private int computeAverage(Map<String, Integer> countMap) { + if (countMap.size() == 0) { + return -1; + } + int total = 0; + for (int value : countMap.values()) { + total += value; + } + return total / countMap.size(); + } + + @Override + public ZkClient getZkClient() { + return _zkClient; + } + + @Override + public String getClusterName() { + return _clusterName; + } + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java index 7fb9746,0cf1ca3..53db79b --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java @@@ -19,9 -19,9 +19,9 @@@ package org.apache.helix.manager.zk * under the License. */ - import org.apache.helix.integration.ZkStandAloneCMTestBase; + import org.apache.helix.integration.common.ZkStandAloneCMTestBase; import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; import org.testng.Assert; import org.testng.annotations.Test; http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java ---------------------------------------------------------------------- diff --cc helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java index 735325f,29df0b7..6808292 --- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java @@@ -34,9 -34,10 +34,10 @@@ import org.apache.helix.integration.man import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState; import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver; + import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterStateVerifier; -import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier; +import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier; import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.AfterClass; http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java ----------------------------------------------------------------------