http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
index 0000000,6572879..6472389
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
@@@ -1,0 -1,459 +1,459 @@@
+ package org.apache.helix.integration.messaging;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.lang.reflect.Field;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ 
+ import org.apache.helix.ControllerChangeListener;
+ import org.apache.helix.ExternalViewChangeListener;
+ import org.apache.helix.HelixAdmin;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.IdealStateChangeListener;
+ import org.apache.helix.InstanceConfigChangeListener;
+ import org.apache.helix.InstanceType;
+ import org.apache.helix.LiveInstanceChangeListener;
+ import org.apache.helix.NotificationContext;
+ import org.apache.helix.PropertyKey.Builder;
+ import org.apache.helix.TestHelper;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.controller.HelixControllerMain;
+ import org.apache.helix.integration.common.ZkIntegrationTestBase;
+ import org.apache.helix.manager.zk.ZKHelixAdmin;
+ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+ import org.apache.helix.manager.zk.ZKHelixManager;
+ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+ import org.apache.helix.model.ClusterConstraints;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.InstanceConfig;
+ import org.apache.helix.model.LiveInstance;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.helix.model.builder.ConstraintItemBuilder;
+ import org.apache.helix.participant.StateMachineEngine;
+ import org.apache.helix.participant.statemachine.StateModel;
+ import org.apache.helix.participant.statemachine.StateModelFactory;
+ import org.apache.helix.participant.statemachine.StateModelInfo;
+ import org.apache.helix.participant.statemachine.Transition;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ // test case from Ming Fang
+ public class TestMessageThrottle2 extends ZkIntegrationTestBase {
+   final static String clusterName = "TestMessageThrottle2";
+   final static String resourceName = "MyResource";
+ 
+   @Test
+   public void test() throws Exception {
+     System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+ 
+     startAdmin();
+     startController();
+ 
+     // start node2 first
+     Node.main(new String[] {
+       "2"
+     });
+ 
+     // wait for node2 becoming MASTER
+     final Builder keyBuilder = new Builder(clusterName);
+     final HelixDataAccessor accessor =
+         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+     TestHelper.verify(new TestHelper.Verifier() {
+ 
+       @Override
+       public boolean verify() throws Exception {
+         ExternalView view = 
accessor.getProperty(keyBuilder.externalView(resourceName));
+         String state = null;
+ 
+         if (view != null) {
+           Map<String, String> map = view.getStateMap(resourceName);
+           if (map != null) {
+             state = map.get("node2");
+           }
+         }
+         return state != null && state.equals("MASTER");
+       }
+     }, 10 * 1000);
+ 
+     // start node 1
+     Node.main(new String[] {
+       "1"
+     });
+ 
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+             clusterName));
+     Assert.assertTrue(result);
+ 
+     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+   }
+ 
+   void startController() throws Exception {
+     // start helixController
+     System.out.println(String.format("Starting Controller{Cluster:%s, 
Port:%s, Zookeeper:%s}",
+         clusterName, 12000, ZK_ADDR));
+     HelixManager helixController =
+         HelixControllerMain.startHelixController(ZK_ADDR, clusterName, 
"localhost_" + 12000,
+             HelixControllerMain.STANDALONE);
+ 
+     StatusPrinter statusPrinter = new StatusPrinter();
+     statusPrinter.registerWith(helixController);
+   }
+ 
+   void startAdmin() throws Exception {
+     HelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+ 
+     // create cluster
+     System.out.println("Creating cluster: " + clusterName);
+     admin.addCluster(clusterName, true);
+ 
+     // add MasterSlave state mode definition
+     admin.addStateModelDef(clusterName, "MasterSlave", new 
StateModelDefinition(
+         generateConfigForMasterSlave()));
+ 
+     // ideal-state znrecord
+     ZNRecord record = new ZNRecord(resourceName);
+     record.setSimpleField("IDEAL_STATE_MODE", "AUTO");
+     record.setSimpleField("NUM_PARTITIONS", "1");
+     record.setSimpleField("REPLICAS", "2");
+     record.setSimpleField("STATE_MODEL_DEF_REF", "MasterSlave");
+     record.setListField(resourceName, Arrays.asList("node1", "node2"));
+ 
+     admin.setResourceIdealState(clusterName, resourceName, new 
IdealState(record));
+ 
+     ConstraintItemBuilder builder = new ConstraintItemBuilder();
+ 
+     // limit one transition message at a time across the entire cluster
+     builder.addConstraintAttribute("MESSAGE_TYPE", "STATE_TRANSITION")
+     // .addConstraintAttribute("INSTANCE", ".*") // un-comment this line if 
using instance-level
+     // constraint
+         .addConstraintAttribute("CONSTRAINT_VALUE", "1");
+     admin.setConstraint(clusterName, 
ClusterConstraints.ConstraintType.MESSAGE_CONSTRAINT,
+         "constraint1", builder.build());
+   }
+ 
+   ZNRecord generateConfigForMasterSlave() {
+     ZNRecord record = new ZNRecord("MasterSlave");
+     record.setSimpleField(
+         
StateModelDefinition.StateModelDefinitionProperty.INITIAL_STATE.toString(), 
"OFFLINE");
+     List<String> statePriorityList = new ArrayList<String>();
+     statePriorityList.add("MASTER");
+     statePriorityList.add("SLAVE");
+     statePriorityList.add("OFFLINE");
+     statePriorityList.add("DROPPED");
+     statePriorityList.add("ERROR");
+     record.setListField(
+         
StateModelDefinition.StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+         statePriorityList);
+     for (String state : statePriorityList) {
+       String key = state + ".meta";
+       Map<String, String> metadata = new HashMap<String, String>();
+       if (state.equals("MASTER")) {
+         metadata.put("count", "1");
+         record.setMapField(key, metadata);
+       } else if (state.equals("SLAVE")) {
+         metadata.put("count", "R");
+         record.setMapField(key, metadata);
+       } else if (state.equals("OFFLINE")) {
+         metadata.put("count", "-1");
+         record.setMapField(key, metadata);
+       } else if (state.equals("DROPPED")) {
+         metadata.put("count", "-1");
+         record.setMapField(key, metadata);
+       } else if (state.equals("ERROR")) {
+         metadata.put("count", "-1");
+         record.setMapField(key, metadata);
+       }
+     }
+     for (String state : statePriorityList) {
+       String key = state + ".next";
+       if (state.equals("MASTER")) {
+         Map<String, String> metadata = new HashMap<String, String>();
+         metadata.put("SLAVE", "SLAVE");
+         metadata.put("OFFLINE", "SLAVE");
+         metadata.put("DROPPED", "SLAVE");
+         record.setMapField(key, metadata);
+       } else if (state.equals("SLAVE")) {
+         Map<String, String> metadata = new HashMap<String, String>();
+         metadata.put("MASTER", "MASTER");
+         metadata.put("OFFLINE", "OFFLINE");
+         metadata.put("DROPPED", "OFFLINE");
+         record.setMapField(key, metadata);
+       } else if (state.equals("OFFLINE")) {
+         Map<String, String> metadata = new HashMap<String, String>();
+         metadata.put("SLAVE", "SLAVE");
+         metadata.put("MASTER", "SLAVE");
+         metadata.put("DROPPED", "DROPPED");
+         record.setMapField(key, metadata);
+       } else if (state.equals("ERROR")) {
+         Map<String, String> metadata = new HashMap<String, String>();
+         metadata.put("OFFLINE", "OFFLINE");
+         record.setMapField(key, metadata);
+       }
+     }
+ 
+     // change the transition priority list
+     List<String> stateTransitionPriorityList = new ArrayList<String>();
+     stateTransitionPriorityList.add("SLAVE-MASTER");
+     stateTransitionPriorityList.add("OFFLINE-SLAVE");
+     stateTransitionPriorityList.add("MASTER-SLAVE");
+     stateTransitionPriorityList.add("SLAVE-OFFLINE");
+     stateTransitionPriorityList.add("OFFLINE-DROPPED");
+     record.setListField(
+         
StateModelDefinition.StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+         stateTransitionPriorityList);
+     return record;
+     // ZNRecordSerializer serializer = new ZNRecordSerializer();
+     // System.out.println(new String(serializer.serialize(record)));
+   }
+ 
+   static final class MyProcess {
+     private final String instanceName;
+     private HelixManager helixManager;
+ 
+     public MyProcess(String instanceName) {
+       this.instanceName = instanceName;
+     }
+ 
+     public void start() throws Exception {
+       helixManager =
+           new ZKHelixManager(clusterName, instanceName, 
InstanceType.PARTICIPANT, ZK_ADDR);
+       {
+         // hack to set sessionTimeout
+         Field sessionTimeout = 
ZKHelixManager.class.getDeclaredField("_sessionTimeout");
+         sessionTimeout.setAccessible(true);
+         sessionTimeout.setInt(helixManager, 1000);
+       }
+ 
+       StateMachineEngine stateMach = helixManager.getStateMachineEngine();
+       stateMach.registerStateModelFactory("MasterSlave", new 
MyStateModelFactory(helixManager));
+       helixManager.connect();
+ 
+       StatusPrinter statusPrinter = new StatusPrinter();
+       statusPrinter.registerWith(helixManager);
+     }
+ 
+     public void stop() {
+       helixManager.disconnect();
+     }
+   }
+ 
+   @StateModelInfo(initialState = "OFFLINE", states = {
+       "MASTER", "SLAVE", "ERROR"
+   })
+   public static class MyStateModel extends StateModel {
+     private static final Logger LOGGER = Logger.getLogger(MyStateModel.class);
+ 
+     private final HelixManager helixManager;
+ 
+     public MyStateModel(HelixManager helixManager) {
+       this.helixManager = helixManager;
+     }
+ 
+     @Transition(to = "SLAVE", from = "OFFLINE")
+     public void onBecomeSlaveFromOffline(Message message, NotificationContext 
context) {
+       String partitionName = message.getPartitionName();
+       String instanceName = message.getTgtName();
+       LOGGER.info(instanceName + " becomes SLAVE from OFFLINE for " + 
partitionName);
+     }
+ 
+     @Transition(to = "SLAVE", from = "MASTER")
+     public void onBecomeSlaveFromMaster(Message message, NotificationContext 
context) {
+       String partitionName = message.getPartitionName();
+       String instanceName = message.getTgtName();
+       LOGGER.info(instanceName + " becomes SLAVE from MASTER for " + 
partitionName);
+     }
+ 
+     @Transition(to = "MASTER", from = "SLAVE")
+     public void onBecomeMasterFromSlave(Message message, NotificationContext 
context) {
+       String partitionName = message.getPartitionName();
+       String instanceName = message.getTgtName();
+       LOGGER.info(instanceName + " becomes MASTER from SLAVE for " + 
partitionName);
+     }
+ 
+     @Transition(to = "OFFLINE", from = "SLAVE")
+     public void onBecomeOfflineFromSlave(Message message, NotificationContext 
context) {
+       String partitionName = message.getPartitionName();
+       String instanceName = message.getTgtName();
+       LOGGER.info(instanceName + " becomes OFFLINE from SLAVE for " + 
partitionName);
+     }
+ 
+     @Transition(to = "DROPPED", from = "OFFLINE")
+     public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+       String partitionName = message.getPartitionName();
+       String instanceName = message.getTgtName();
+       LOGGER.info(instanceName + " becomes DROPPED from OFFLINE for " + 
partitionName);
+     }
+ 
+     @Transition(to = "OFFLINE", from = "ERROR")
+     public void onBecomeOfflineFromError(Message message, NotificationContext 
context) {
+       String partitionName = message.getPartitionName();
+       String instanceName = message.getTgtName();
+       LOGGER.info(instanceName + " becomes OFFLINE from ERROR for " + 
partitionName);
+     }
+   }
+ 
+   static class MyStateModelFactory extends StateModelFactory<MyStateModel> {
+     private final HelixManager helixManager;
+ 
+     public MyStateModelFactory(HelixManager helixManager) {
+       this.helixManager = helixManager;
+     }
+ 
+     @Override
+     public MyStateModel createNewStateModel(String resource, String 
partitionName) {
+       return new MyStateModel(helixManager);
+     }
+   }
+ 
+   static class Node {
+     // ------------------------------ FIELDS ------------------------------
+ 
+     private static final Logger LOGGER = Logger.getLogger(Node.class);
+ 
+     // -------------------------- INNER CLASSES --------------------------
+ 
+     // --------------------------- main() method ---------------------------
+ 
+     public static void main(String[] args) throws Exception {
+       if (args.length < 1) {
+         LOGGER.info("usage: id");
+         System.exit(0);
+       }
+       int id = Integer.parseInt(args[0]);
+       String instanceName = "node" + id;
+ 
+       addInstanceConfig(instanceName);
+       startProcess(instanceName);
+     }
+ 
+     private static void addInstanceConfig(String instanceName) {
+       // add node to cluster if not already added
+       ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
+ 
+       InstanceConfig instanceConfig = null;
+       try {
+         instanceConfig = admin.getInstanceConfig(clusterName, instanceName);
+       } catch (Exception e) {
+       }
+       if (instanceConfig == null) {
+         InstanceConfig config = new InstanceConfig(instanceName);
+         config.setHostName("localhost");
+         config.setInstanceEnabled(true);
+         echo("Adding InstanceConfig:" + config);
+         admin.addInstance(clusterName, config);
+       }
+     }
+ 
+     public static void echo(Object obj) {
+       LOGGER.info(obj);
+     }
+ 
+     private static void startProcess(String instanceName) throws Exception {
+       MyProcess process = new MyProcess(instanceName);
+       process.start();
+     }
+   }
+ 
+   static class StatusPrinter implements IdealStateChangeListener, 
InstanceConfigChangeListener,
+       ExternalViewChangeListener, LiveInstanceChangeListener, 
ControllerChangeListener {
+     // ------------------------------ FIELDS ------------------------------
+ 
+     private HelixManager helixManager;
+ 
+     // ------------------------ INTERFACE METHODS ------------------------
+ 
+     // --------------------- Interface ControllerChangeListener
+     // ---------------------
+ 
+     @Override
+     public void onControllerChange(NotificationContext changeContext) {
+       System.out.println("StatusPrinter.onControllerChange:" + changeContext);
+     }
+ 
+     // --------------------- Interface ExternalViewChangeListener
+     // ---------------------
+ 
+     @Override
+     public void onExternalViewChange(List<ExternalView> externalViewList,
+         NotificationContext changeContext) {
+       for (ExternalView externalView : externalViewList) {
+         System.out
+             .println("StatusPrinter.onExternalViewChange:" + "externalView = 
" + externalView);
+       }
+     }
+ 
+     // --------------------- Interface IdealStateChangeListener
+     // ---------------------
+ 
+     @Override
+     public void onIdealStateChange(List<IdealState> idealState, 
NotificationContext changeContext) {
+       for (IdealState state : idealState) {
+         System.out.println("StatusPrinter.onIdealStateChange:" + "state = " + 
state);
+       }
+     }
+ 
+     // --------------------- Interface InstanceConfigChangeListener
+     // ---------------------
+ 
+     @Override
+     public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+         NotificationContext context) {
+       for (InstanceConfig instanceConfig : instanceConfigs) {
+         System.out.println("StatusPrinter.onInstanceConfigChange:" + 
"instanceConfig = "
+             + instanceConfig);
+       }
+     }
+ 
+     // --------------------- Interface LiveInstanceChangeListener
+     // ---------------------
+ 
+     @Override
+     public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+         NotificationContext changeContext) {
+       for (LiveInstance liveInstance : liveInstances) {
+         System.out
+             .println("StatusPrinter.onLiveInstanceChange:" + "liveInstance = 
" + liveInstance);
+       }
+     }
+ 
+     // -------------------------- OTHER METHODS --------------------------
+ 
+     public void registerWith(HelixManager helixManager) throws Exception {
+       this.helixManager = helixManager;
+       helixManager.addIdealStateChangeListener(this);
+       helixManager.addInstanceConfigChangeListener(this);
+       helixManager.addExternalViewChangeListener(this);
+       helixManager.addLiveInstanceChangeListener(this);
+       helixManager.addControllerListener(this);
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java
index 0000000,1cc58bf..19f2bf4
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNonOfflineInitState.java
@@@ -1,0 -1,114 +1,114 @@@
+ package org.apache.helix.integration.paticipant;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Date;
+ 
+ import org.apache.helix.TestHelper;
+ import org.apache.helix.integration.common.ZkIntegrationTestBase;
+ import org.apache.helix.integration.manager.ClusterControllerManager;
+ import org.apache.helix.integration.manager.MockParticipantManager;
+ import org.apache.helix.mock.participant.MockBootstrapModelFactory;
+ import org.apache.helix.participant.StateMachineEngine;
+ import org.apache.helix.tools.ClusterSetup;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ public class TestNonOfflineInitState extends ZkIntegrationTestBase {
+   private static Logger LOG = Logger.getLogger(TestNonOfflineInitState.class);
+ 
+   @Test
+   public void testNonOfflineInitState() throws Exception {
+     System.out.println("START testNonOfflineInitState at " + new 
Date(System.currentTimeMillis()));
+     String clusterName = getShortClassName();
+ 
+     setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+         "localhost", // participant name prefix
+         "TestDB", // resource name prefix
+         1, // resources
+         10, // partitions per resource
+         5, // number of nodes
+         1, // replicas
+         "Bootstrap", true); // do rebalance
+ 
+     ClusterControllerManager controller =
+         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+     controller.syncStart();
+ 
+     // start participants
+     MockParticipantManager[] participants = new MockParticipantManager[5];
+     for (int i = 0; i < 5; i++) {
+       String instanceName = "localhost_" + (12918 + i);
+ 
+       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+ 
+       // add a state model with non-OFFLINE initial state
+       StateMachineEngine stateMach = participants[i].getStateMachineEngine();
+       MockBootstrapModelFactory bootstrapFactory = new 
MockBootstrapModelFactory();
+       stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
+ 
+       participants[i].syncStart();
+     }
+ 
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+             clusterName));
+     Assert.assertTrue(result);
+ 
+     // clean up
+     controller.syncStop();
+     for (int i = 0; i < 5; i++) {
+       participants[i].syncStop();
+     }
+ 
+     System.out.println("END testNonOfflineInitState at " + new 
Date(System.currentTimeMillis()));
+   }
+ 
+   private static void setupCluster(String clusterName, String ZkAddr, int 
startPort,
+       String participantNamePrefix, String resourceNamePrefix, int 
resourceNb, int partitionNb,
+       int nodesNb, int replica, String stateModelDef, boolean doRebalance) 
throws Exception {
+     if (_gZkClient.exists("/" + clusterName)) {
+       LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+       _gZkClient.deleteRecursive("/" + clusterName);
+     }
+ 
+     ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+     setupTool.addCluster(clusterName, true);
+     setupTool.addStateModelDef(clusterName, "Bootstrap",
+         TestHelper.generateStateModelDefForBootstrap());
+ 
+     for (int i = 0; i < nodesNb; i++) {
+       int port = startPort + i;
+       setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" 
+ port);
+     }
+ 
+     for (int i = 0; i < resourceNb; i++) {
+       String dbName = resourceNamePrefix + i;
+       setupTool.addResourceToCluster(clusterName, dbName, partitionNb, 
stateModelDef);
+       if (doRebalance) {
+         setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
+       }
+     }
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java
index 0000000,95b78e5..7945c4b
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestParticipantErrorMessage.java
@@@ -1,0 -1,99 +1,99 @@@
+ package org.apache.helix.integration.paticipant;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.UUID;
+ 
+ import org.apache.helix.Criteria;
+ import org.apache.helix.InstanceType;
+ import org.apache.helix.PropertyKey.Builder;
+ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+ import 
org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory;
+ import 
org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageType;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase {
+   private static Logger LOG = 
Logger.getLogger(TestParticipantErrorMessage.class);
+ 
+   @Test()
+   public void TestParticipantErrorMessageSend() {
+     String participant1 = "localhost_" + START_PORT;
+     String participant2 = "localhost_" + (START_PORT + 1);
+ 
+     Message errorMessage1 =
+         new Message(MessageType.PARTICIPANT_ERROR_REPORT, 
UUID.randomUUID().toString());
+     errorMessage1.setTgtSessionId("*");
+     errorMessage1.getRecord().setSimpleField(
+         DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY,
+         ActionOnError.DISABLE_INSTANCE.toString());
+     Criteria recipientCriteria = new Criteria();
+     recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
+     recipientCriteria.setSessionSpecific(false);
+     _participants[0].getMessagingService().send(recipientCriteria,
+         errorMessage1);
+ 
+     Message errorMessage2 =
+         new Message(MessageType.PARTICIPANT_ERROR_REPORT, 
UUID.randomUUID().toString());
+     errorMessage2.setTgtSessionId("*");
+     errorMessage2.setResourceName("TestDB");
+     errorMessage2.setPartitionName("TestDB_14");
+     errorMessage2.getRecord().setSimpleField(
+         DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY,
+         ActionOnError.DISABLE_PARTITION.toString());
+     Criteria recipientCriteria2 = new Criteria();
+     recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
+     recipientCriteria2.setSessionSpecific(false);
+     _participants[1].getMessagingService().send(recipientCriteria2,
+         errorMessage2);
+ 
+     try {
+       Thread.sleep(1500);
+     } catch (InterruptedException e) {
+       LOG.error("Interrupted sleep", e);
+     }
+ 
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+             CLUSTER_NAME));
+     Assert.assertTrue(result);
+     Builder kb = _participants[1].getHelixDataAccessor().keyBuilder();
+     ExternalView externalView =
+         _participants[1].getHelixDataAccessor().getProperty(
+             kb.externalView("TestDB"));
+ 
+     for (String partitionName : 
externalView.getRecord().getMapFields().keySet()) {
+       for (String hostName : 
externalView.getRecord().getMapField(partitionName).keySet()) {
+         if (hostName.equals(participant1)) {
+           
Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName)
+               .equalsIgnoreCase("OFFLINE"));
+         }
+       }
+     }
+     
Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2)
+         .equalsIgnoreCase("OFFLINE"));
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java
index 0000000,1fe742e..da8a4bc
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestRestartParticipant.java
@@@ -1,0 -1,118 +1,118 @@@
+ package org.apache.helix.integration.paticipant;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Date;
+ import java.util.concurrent.atomic.AtomicReference;
+ 
+ import org.apache.helix.NotificationContext;
+ import org.apache.helix.TestHelper;
+ import org.apache.helix.integration.common.ZkIntegrationTestBase;
+ import org.apache.helix.integration.manager.ClusterControllerManager;
+ import org.apache.helix.integration.manager.MockParticipantManager;
+ import org.apache.helix.mock.participant.MockTransition;
+ import org.apache.helix.model.Message;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ public class TestRestartParticipant extends ZkIntegrationTestBase {
+   public class KillOtherTransition extends MockTransition {
+     final AtomicReference<MockParticipantManager> _other;
+ 
+     public KillOtherTransition(MockParticipantManager other) {
+       _other = new AtomicReference<MockParticipantManager>(other);
+     }
+ 
+     @Override
+     public void doTransition(Message message, NotificationContext context) {
+       MockParticipantManager other = _other.getAndSet(null);
+       if (other != null) {
+         System.err.println("Kill " + other.getInstanceName()
+             + ". Interrupted exceptions are IGNORABLE");
+         other.syncStop();
+       }
+     }
+   }
+ 
+   @Test()
+   public void testRestartParticipant() throws Exception {
+     // Logger.getRootLogger().setLevel(Level.INFO);
+     System.out.println("START testRestartParticipant at " + new 
Date(System.currentTimeMillis()));
+ 
+     String clusterName = getShortClassName();
+     MockParticipantManager[] participants = new MockParticipantManager[5];
+ 
+     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+         "localhost", // participant name prefix
+         "TestDB", // resource name prefix
+         1, // resources
+         10, // partitions per resource
+         5, // number of nodes
+         3, // replicas
+         "MasterSlave", true); // do rebalance
+ 
+     ClusterControllerManager controller =
+         new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+     controller.syncStart();
+ 
+     // start participants
+     for (int i = 0; i < 5; i++) {
+       String instanceName = "localhost_" + (12918 + i);
+ 
+       if (i == 4) {
+         participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+         participants[i].setTransition(new 
KillOtherTransition(participants[0]));
+       } else {
+         participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+       }
+ 
+       participants[i].syncStart();
+     }
+ 
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+             clusterName));
+     Assert.assertTrue(result);
+ 
+     // restart
+     Thread.sleep(500);
+     MockParticipantManager participant =
+         new MockParticipantManager(ZK_ADDR, participants[0].getClusterName(),
+             participants[0].getInstanceName());
+     System.err.println("Restart " + participant.getInstanceName());
+     participant.syncStart();
+     result =
+         ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+             clusterName));
+     Assert.assertTrue(result);
+ 
+     // clean up
+     controller.syncStop();
+     for (int i = 0; i < 5; i++) {
+       participants[i].syncStop();
+     }
+     participant.syncStop();
+ 
+     System.out.println("START testRestartParticipant at " + new 
Date(System.currentTimeMillis()));
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
index 0000000,bd01067..772fcac
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
@@@ -1,0 -1,184 +1,184 @@@
+ package org.apache.helix.integration.paticipant;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Collection;
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.NotificationContext;
+ import org.apache.helix.PropertyKey.Builder;
+ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+ import org.apache.helix.integration.manager.ClusterControllerManager;
+ import org.apache.helix.integration.manager.MockParticipantManager;
+ import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
+ import org.apache.helix.mock.participant.MockMSStateModel;
+ import org.apache.helix.mock.participant.MockTransition;
+ import org.apache.helix.mock.participant.SleepTransition;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.participant.statemachine.StateModelFactory;
+ import org.apache.helix.participant.statemachine.StateModelInfo;
+ import org.apache.helix.participant.statemachine.StateTransitionError;
+ import org.apache.helix.participant.statemachine.Transition;
+ import org.apache.helix.tools.ClusterSetup;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.MasterNbInExtViewVerifier;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.BeforeClass;
+ import org.testng.annotations.Test;
+ 
+ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase {
+   private static Logger LOG = 
Logger.getLogger(TestStateTransitionTimeout.class);
+ 
+   @Override
+   @BeforeClass
+   public void beforeClass() throws Exception {
+     System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+ 
+     String namespace = "/" + CLUSTER_NAME;
+     if (_gZkClient.exists(namespace)) {
+       _gZkClient.deleteRecursive(namespace);
+     }
+     _setupTool = new ClusterSetup(ZK_ADDR);
+ 
+     // setup storage cluster
+     _setupTool.addCluster(CLUSTER_NAME, true);
+     _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, 
STATE_MODEL);
+ 
+     for (int i = 0; i < NODE_NR; i++) {
+       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+     }
+     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
+ 
+     // Set the timeout values
+     IdealState idealState =
+         
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
+     String stateTransition = "SLAVE" + "-" + "MASTER" + "_" + 
Message.Attributes.TIMEOUT;
+     idealState.getRecord().setSimpleField(stateTransition, "300");
+ 
+     String command =
+         "-zkSvr " + ZK_ADDR + " -addResourceProperty " + CLUSTER_NAME + " " + 
TEST_DB + " "
+             + stateTransition + " 200";
+     ClusterSetup.processCommandLineArgs(command.split(" "));
+   }
+ 
+   @StateModelInfo(initialState = "OFFLINE", states = {
+       "MASTER", "SLAVE", "ERROR"
+   })
+   public static class TimeOutStateModel extends MockMSStateModel {
+     boolean _sleep = false;
+     StateTransitionError _error;
+     int _errorCallcount = 0;
+ 
+     public TimeOutStateModel(MockTransition transition, boolean sleep) {
+       super(transition);
+       _sleep = sleep;
+     }
+ 
+     @Transition(to = "MASTER", from = "SLAVE")
+     public void onBecomeMasterFromSlave(Message message, NotificationContext 
context)
+         throws InterruptedException {
+       LOG.info("Become MASTER from SLAVE");
+       if (_transition != null && _sleep) {
+         _transition.doTransition(message, context);
+       }
+     }
+ 
+     @Override
+     public void rollbackOnError(Message message, NotificationContext context,
+         StateTransitionError error) {
+       _error = error;
+       _errorCallcount++;
+     }
+   }
+ 
+   public static class SleepStateModelFactory extends 
StateModelFactory<TimeOutStateModel> {
+     Set<String> partitionsToSleep = new HashSet<String>();
+     int _sleepTime;
+ 
+     public SleepStateModelFactory(int sleepTime) {
+       _sleepTime = sleepTime;
+     }
+ 
+     public void setPartitions(Collection<String> partitions) {
+       partitionsToSleep.addAll(partitions);
+     }
+ 
+     public void addPartition(String partition) {
+       partitionsToSleep.add(partition);
+     }
+ 
+     @Override
+     public TimeOutStateModel createNewStateModel(String resource, String 
stateUnitKey) {
+       return new TimeOutStateModel(new SleepTransition(_sleepTime),
+           partitionsToSleep.contains(stateUnitKey));
+     }
+   }
+ 
+   @Test
+   public void testStateTransitionTimeOut() throws Exception {
+     Map<String, SleepStateModelFactory> factories = new HashMap<String, 
SleepStateModelFactory>();
+     IdealState idealState =
+         
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
+     for (int i = 0; i < NODE_NR; i++) {
+       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+       SleepStateModelFactory factory = new SleepStateModelFactory(1000);
+       factories.put(instanceName, factory);
+       for (String p : idealState.getPartitionSet()) {
+         if (idealState.getPreferenceList(p).get(0).equals(instanceName)) {
+           factory.addPartition(p);
+         }
+       }
+ 
+       _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
+       
_participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
 factory);
+       _participants[i].syncStart();
+     }
+     String controllerName = CONTROLLER_PREFIX + "_0";
+     _controller =
+         new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+     _controller.syncStart();
+ 
+     boolean result =
+         ClusterStateVerifier
+             .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
+     Assert.assertTrue(result);
+     HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
+ 
+     Builder kb = accessor.keyBuilder();
+     ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB));
+     for (String p : idealState.getPartitionSet()) {
+       String idealMaster = idealState.getPreferenceList(p).get(0);
+       Assert.assertTrue(ev.getStateMap(p).get(idealMaster).equals("ERROR"));
+ 
+       TimeOutStateModel model = 
factories.get(idealMaster).getStateModel(TEST_DB, p);
+       Assert.assertEquals(model._errorCallcount, 1);
+       Assert.assertEquals(model._error.getCode(), ErrorCode.TIMEOUT);
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
index 0000000,ec73252..35d57c4
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalancePartitionLimit.java
@@@ -1,0 -1,234 +1,234 @@@
+ package org.apache.helix.integration.rebalancer;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import java.util.Date;
+ import java.util.HashMap;
+ import java.util.Map;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.PropertyKey.Builder;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.controller.stages.ClusterDataCache;
+ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+ import org.apache.helix.integration.manager.ClusterControllerManager;
+ import org.apache.helix.integration.manager.MockParticipantManager;
+ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+ import org.apache.helix.manager.zk.ZkClient;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState.RebalanceMode;
+ import org.apache.helix.tools.ClusterSetup;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.ZkVerifier;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.BeforeClass;
+ import org.testng.annotations.Test;
+ 
+ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBase {
+   private static final Logger LOG = 
Logger.getLogger(TestAutoRebalancePartitionLimit.class
+       .getName());
+ 
+   @Override
+   @BeforeClass
+   public void beforeClass() throws Exception {
+     // Logger.getRootLogger().setLevel(Level.INFO);
+     System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+ 
+     String namespace = "/" + CLUSTER_NAME;
+     if (_gZkClient.exists(namespace)) {
+       _gZkClient.deleteRecursive(namespace);
+     }
+     _setupTool = new ClusterSetup(ZK_ADDR);
+ 
+     // setup storage cluster
+     _setupTool.addCluster(CLUSTER_NAME, true);
+ 
+     _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, 100, 
"OnlineOffline",
+         RebalanceMode.FULL_AUTO + "", 0, 25);
+     for (int i = 0; i < NODE_NR; i++) {
+       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+     }
+     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 1);
+ 
+     // start controller
+     String controllerName = CONTROLLER_PREFIX + "_0";
+     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+     _controller.syncStart();
+ 
+     HelixManager manager = _controller; // 
_startCMResultMap.get(controllerName)._manager;
+     HelixDataAccessor accessor = manager.getHelixDataAccessor();
+     // start dummy participants
+     for (int i = 0; i < NODE_NR; i++) {
+       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+       _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
+       _participants[i].syncStart();
+       Thread.sleep(2000);
+       boolean result =
+           ClusterStateVerifier.verifyByZkCallback(new 
ExternalViewBalancedVerifier(_gZkClient,
+               CLUSTER_NAME, TEST_DB));
+       Assert.assertTrue(result);
+       ExternalView ev =
+           
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+       System.out.println(ev.getPartitionSet().size());
+       if (i < 3) {
+         Assert.assertEquals(ev.getPartitionSet().size(), 25 * (i + 1));
+       } else {
+         Assert.assertEquals(ev.getPartitionSet().size(), 100);
+       }
+     }
+ 
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
ExternalViewBalancedVerifier(_gZkClient,
+             CLUSTER_NAME, TEST_DB));
+ 
+     Assert.assertTrue(result);
+   }
+ 
+   @Test()
+   public void testAutoRebalanceWithMaxPartitionPerNode() throws Exception {
+     HelixManager manager = _controller;
+     // kill 1 node
+     _participants[0].syncStop();
+ 
+     // verifyBalanceExternalView();
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
ExternalViewBalancedVerifier(_gZkClient,
+             CLUSTER_NAME, TEST_DB));
+     Assert.assertTrue(result);
+     HelixDataAccessor accessor = manager.getHelixDataAccessor();
+     ExternalView ev =
+         
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+     Assert.assertEquals(ev.getPartitionSet().size(), 100);
+ 
+     _participants[1].syncStop();
+ 
+     // verifyBalanceExternalView();
+     result =
+         ClusterStateVerifier.verifyByZkCallback(new 
ExternalViewBalancedVerifier(_gZkClient,
+             CLUSTER_NAME, TEST_DB));
+     Assert.assertTrue(result);
+     ev = 
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().externalView(TEST_DB));
+     Assert.assertEquals(ev.getPartitionSet().size(), 75);
+ 
+     // add 2 nodes
+     for (int i = 0; i < 2; i++) {
+       String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
+       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ 
+       String newInstanceName = storageNodeName.replace(':', '_');
+       MockParticipantManager participant =
+           new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newInstanceName);
+       participant.syncStart();
+     }
+ 
+     Thread.sleep(1000);
+     result =
+         ClusterStateVerifier.verifyByZkCallback(new 
ExternalViewBalancedVerifier(_gZkClient,
+             CLUSTER_NAME, TEST_DB));
+     Assert.assertTrue(result);
+   }
+ 
+   static boolean verifyBalanceExternalView(ZNRecord externalView, int 
partitionCount,
+       String masterState, int replica, int instances, int maxPerInstance) {
+     Map<String, Integer> masterPartitionsCountMap = new HashMap<String, 
Integer>();
+     for (String partitionName : externalView.getMapFields().keySet()) {
+       Map<String, String> assignmentMap = 
externalView.getMapField(partitionName);
+       // Assert.assertTrue(assignmentMap.size() >= replica);
+       for (String instance : assignmentMap.keySet()) {
+         if (assignmentMap.get(instance).equals(masterState)) {
+           if (!masterPartitionsCountMap.containsKey(instance)) {
+             masterPartitionsCountMap.put(instance, 0);
+           }
+           masterPartitionsCountMap.put(instance, 
masterPartitionsCountMap.get(instance) + 1);
+         }
+       }
+     }
+ 
+     int perInstancePartition = partitionCount / instances;
+ 
+     int totalCount = 0;
+     for (String instanceName : masterPartitionsCountMap.keySet()) {
+       int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+       totalCount += instancePartitionCount;
+       if (!(instancePartitionCount == perInstancePartition
+           || instancePartitionCount == perInstancePartition + 1 || 
instancePartitionCount == maxPerInstance)) {
+         return false;
+       }
+       if (instancePartitionCount == maxPerInstance) {
+         continue;
+       }
+       if (instancePartitionCount == perInstancePartition + 1) {
+         if (partitionCount % instances == 0) {
+           return false;
+         }
+       }
+     }
+     if (totalCount == maxPerInstance * instances) {
+       return true;
+     }
+     if (partitionCount != totalCount) {
+       return false;
+     }
+     return true;
+ 
+   }
+ 
+   public static class ExternalViewBalancedVerifier implements ZkVerifier {
+     String _clusterName;
+     String _resourceName;
+ 
+     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, 
String resourceName) {
+       _clusterName = clusterName;
+       _resourceName = resourceName;
+     }
+ 
+     @Override
+     public boolean verify() {
+       HelixDataAccessor accessor =
+           new ZKHelixDataAccessor(_clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+       Builder keyBuilder = accessor.keyBuilder();
+       int numberOfPartitions =
+           
accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
+               .size();
+       ClusterDataCache cache = new ClusterDataCache();
+       cache.refresh(accessor);
+       String masterValue =
+           
cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef())
+               .getStatesPriorityList().get(0);
+       int replicas = 
Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+       return 
verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName))
+           .getRecord(), numberOfPartitions, masterValue, replicas, 
cache.getLiveInstances().size(),
+           cache.getIdealState(_resourceName).getMaxPartitionsPerInstance());
+     }
+ 
+     @Override
+     public ZkClient getZkClient() {
+       return _gZkClient;
+     }
+ 
+     @Override
+     public String getClusterName() {
+       return _clusterName;
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
index 0000000,9098e8e..af2b5a1
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestCustomizedIdealStateRebalancer.java
@@@ -1,0 -1,204 +1,204 @@@
+ package org.apache.helix.integration.rebalancer;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import com.beust.jcommander.internal.Lists;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.HelixManager;
+ import org.apache.helix.PropertyKey.Builder;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.controller.rebalancer.Rebalancer;
+ import org.apache.helix.controller.stages.ClusterDataCache;
+ import org.apache.helix.controller.stages.CurrentStateOutput;
+ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+ import org.apache.helix.manager.zk.ZkClient;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.IdealState.IdealStateProperty;
+ import org.apache.helix.model.IdealState.RebalanceMode;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.ZkVerifier;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ public class TestCustomizedIdealStateRebalancer extends 
ZkStandAloneCMTestBase {
+   String db2 = TEST_DB + "2";
+   static boolean testRebalancerCreated = false;
+   static boolean testRebalancerInvoked = false;
+ 
+   public static class TestRebalancer implements Rebalancer {
+ 
+     @Override
+     public void init(HelixManager manager) {
+       testRebalancerCreated = true;
+     }
+ 
+     @Override
+     public IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
+         CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+       testRebalancerInvoked = true;
+       List<String> liveNodes = 
Lists.newArrayList(clusterData.getLiveInstances().keySet());
+       int i = 0;
+       for (String partition : currentIdealState.getPartitionSet()) {
+         int index = i++ % liveNodes.size();
+         String instance = liveNodes.get(index);
+         currentIdealState.getPreferenceList(partition).clear();
+         currentIdealState.getPreferenceList(partition).add(instance);
+ 
+         currentIdealState.getInstanceStateMap(partition).clear();
+         currentIdealState.getInstanceStateMap(partition).put(instance, 
"MASTER");
+       }
+       currentIdealState.setReplicas("1");
+       return currentIdealState;
+     }
+   }
+ 
+   @Test
+   public void testCustomizedIdealStateRebalancer() throws 
InterruptedException {
+     _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "MasterSlave");
+     _setupTool.addResourceProperty(CLUSTER_NAME, db2,
+         IdealStateProperty.REBALANCER_CLASS_NAME.toString(),
+         TestCustomizedIdealStateRebalancer.TestRebalancer.class.getName());
+     _setupTool.addResourceProperty(CLUSTER_NAME, db2, 
IdealStateProperty.REBALANCE_MODE.toString(),
+         RebalanceMode.USER_DEFINED.toString());
+ 
+     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 3);
+ 
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
ExternalViewBalancedVerifier(_gZkClient,
+             CLUSTER_NAME, db2));
+     Assert.assertTrue(result);
+     Thread.sleep(1000);
+     HelixDataAccessor accessor =
+         new ZKHelixDataAccessor(CLUSTER_NAME, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+     Builder keyBuilder = accessor.keyBuilder();
+     ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
+     Assert.assertEquals(ev.getPartitionSet().size(), 60);
+     for (String partition : ev.getPartitionSet()) {
+       Assert.assertEquals(ev.getStateMap(partition).size(), 1);
+     }
+     IdealState is = accessor.getProperty(keyBuilder.idealStates(db2));
+     for (String partition : is.getPartitionSet()) {
+       Assert.assertEquals(is.getPreferenceList(partition).size(), 0);
+       Assert.assertEquals(is.getInstanceStateMap(partition).size(), 0);
+     }
+     Assert.assertTrue(testRebalancerCreated);
+     Assert.assertTrue(testRebalancerInvoked);
+   }
+ 
+   public static class ExternalViewBalancedVerifier implements ZkVerifier {
+     ZkClient _client;
+     String _clusterName;
+     String _resourceName;
+ 
+     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, 
String resourceName) {
+       _client = client;
+       _clusterName = clusterName;
+       _resourceName = resourceName;
+     }
+ 
+     @Override
+     public boolean verify() {
+       try {
+         HelixDataAccessor accessor =
+             new ZKHelixDataAccessor(_clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_client));
+         Builder keyBuilder = accessor.keyBuilder();
+         int numberOfPartitions =
+             
accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields()
+                 .size();
+         ClusterDataCache cache = new ClusterDataCache();
+         cache.refresh(accessor);
+         String masterValue =
+             
cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef())
+                 .getStatesPriorityList().get(0);
+         int replicas = 
Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
+         String instanceGroupTag = 
cache.getIdealState(_resourceName).getInstanceGroupTag();
+         int instances = 0;
+         for (String liveInstanceName : cache.getLiveInstances().keySet()) {
+           if 
(cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag))
 {
+             instances++;
+           }
+         }
+         if (instances == 0) {
+           instances = cache.getLiveInstances().size();
+         }
+         return verifyBalanceExternalView(
+             
accessor.getProperty(keyBuilder.externalView(_resourceName)).getRecord(),
+             numberOfPartitions, masterValue, replicas, instances);
+       } catch (Exception e) {
+         return false;
+       }
+     }
+ 
+     @Override
+     public ZkClient getZkClient() {
+       return _client;
+     }
+ 
+     @Override
+     public String getClusterName() {
+       return _clusterName;
+     }
+ 
+   }
+ 
+   static boolean verifyBalanceExternalView(ZNRecord externalView, int 
partitionCount,
+       String masterState, int replica, int instances) {
+     Map<String, Integer> masterPartitionsCountMap = new HashMap<String, 
Integer>();
+     for (String partitionName : externalView.getMapFields().keySet()) {
+       Map<String, String> assignmentMap = 
externalView.getMapField(partitionName);
+       // Assert.assertTrue(assignmentMap.size() >= replica);
+       for (String instance : assignmentMap.keySet()) {
+         if (assignmentMap.get(instance).equals(masterState)) {
+           if (!masterPartitionsCountMap.containsKey(instance)) {
+             masterPartitionsCountMap.put(instance, 0);
+           }
+           masterPartitionsCountMap.put(instance, 
masterPartitionsCountMap.get(instance) + 1);
+         }
+       }
+     }
+ 
+     int perInstancePartition = partitionCount / instances;
+ 
+     int totalCount = 0;
+     for (String instanceName : masterPartitionsCountMap.keySet()) {
+       int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
+       totalCount += instancePartitionCount;
+       if (!(instancePartitionCount == perInstancePartition || 
instancePartitionCount == perInstancePartition + 1)) {
+         return false;
+       }
+       if (instancePartitionCount == perInstancePartition + 1) {
+         if (partitionCount % instances == 0) {
+           return false;
+         }
+       }
+     }
+     if (partitionCount != totalCount) {
+       return false;
+     }
+     return true;
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
index 0000000,335b99d..813ee69
mode 000000,100644..100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestFullAutoNodeTagging.java
@@@ -1,0 -1,478 +1,478 @@@
+ package org.apache.helix.integration.rebalancer;
+ 
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+ import java.util.Date;
+ import java.util.Map;
+ import java.util.Set;
+ import org.apache.helix.BaseDataAccessor;
+ import org.apache.helix.HelixAdmin;
+ import org.apache.helix.HelixDataAccessor;
+ import org.apache.helix.PropertyKey;
+ import org.apache.helix.TestHelper;
+ import org.apache.helix.TestHelper.Verifier;
+ import org.apache.helix.ZNRecord;
+ import org.apache.helix.ZkUnitTestBase;
+ import org.apache.helix.integration.manager.ClusterControllerManager;
+ import org.apache.helix.integration.manager.MockParticipantManager;
+ import org.apache.helix.manager.zk.ZKHelixAdmin;
+ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+ import org.apache.helix.manager.zk.ZkClient;
+ import org.apache.helix.model.ExternalView;
+ import org.apache.helix.model.IdealState;
+ import org.apache.helix.model.IdealState.RebalanceMode;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 -import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
++import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
++import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.ZkVerifier;
+ import org.apache.helix.util.ZKClientPool;
+ import org.apache.log4j.Logger;
+ import org.testng.Assert;
+ import org.testng.annotations.Test;
+ 
+ /**
+  * Test that node tagging behaves correctly in FULL_AUTO mode
+  */
+ public class TestFullAutoNodeTagging extends ZkUnitTestBase {
+   private static final Logger LOG = 
Logger.getLogger(TestFullAutoNodeTagging.class);
+ 
+   @Test
+   public void testUntag() throws Exception {
+     final int NUM_PARTICIPANTS = 2;
+     final int NUM_PARTITIONS = 4;
+     final int NUM_REPLICAS = 1;
+     final String RESOURCE_NAME = "TestResource0";
+     final String TAG = "ASSIGNABLE";
+ 
+     String className = TestHelper.getTestClassName();
+     String methodName = TestHelper.getTestMethodName();
+     final String clusterName = className + "_" + methodName;
+     System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+ 
+     // Set up cluster
+     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+         "localhost", // participant name prefix
+         "TestResource", // resource name prefix
+         1, // resources
+         NUM_PARTITIONS, // partitions per resource
+         NUM_PARTICIPANTS, // number of nodes
+         NUM_REPLICAS, // replicas
+         "OnlineOffline", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to 
test node tagging
+         true); // do rebalance
+ 
+     // Tag the resource
+     final HelixAdmin helixAdmin = new ZKHelixAdmin(_gZkClient);
+     IdealState idealState = helixAdmin.getResourceIdealState(clusterName, 
RESOURCE_NAME);
+     idealState.setInstanceGroupTag(TAG);
+     helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ 
+     // Get a data accessor
+     final HelixDataAccessor accessor =
+         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+     final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ 
+     // Tag the participants
+     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+       final String instanceName = "localhost_" + (12918 + i);
+       helixAdmin.addInstanceTag(clusterName, instanceName, TAG);
+     }
+ 
+     // Start controller
+     ClusterControllerManager controller =
+         new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+     controller.syncStart();
+ 
+     // Start participants
+     MockParticipantManager[] participants = new 
MockParticipantManager[NUM_PARTICIPANTS];
+     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+       final String instanceName = "localhost_" + (12918 + i);
+ 
+       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+       participants[i].syncStart();
+     }
+ 
+     // Verify that there are NUM_PARTITIONS partitions in the external view, 
each having
+     // NUM_REPLICAS replicas, where all assigned replicas are to tagged 
nodes, and they are all
+     // ONLINE.
+     Verifier v = new Verifier() {
+       @Override
+       public boolean verify() throws Exception {
+         ExternalView externalView =
+             pollForProperty(ExternalView.class, accessor, 
keyBuilder.externalView(RESOURCE_NAME),
+                 true);
+         if (externalView == null) {
+           return false;
+         }
+         Set<String> taggedInstances =
+             
Sets.newHashSet(helixAdmin.getInstancesInClusterWithTag(clusterName, TAG));
+         Set<String> partitionSet = externalView.getPartitionSet();
+         if (partitionSet.size() != NUM_PARTITIONS) {
+           return false;
+         }
+         for (String partitionName : partitionSet) {
+           Map<String, String> stateMap = 
externalView.getStateMap(partitionName);
+           if (stateMap.size() != NUM_REPLICAS) {
+             return false;
+           }
+           for (String participantName : stateMap.keySet()) {
+             if (!taggedInstances.contains(participantName)) {
+               return false;
+             }
+             String state = stateMap.get(participantName);
+             if (!state.equalsIgnoreCase("ONLINE")) {
+               return false;
+             }
+           }
+         }
+         return true;
+       }
+     };
+ 
+     // Run the verifier for both nodes tagged
+     boolean initialResult = TestHelper.verify(v, 10 * 1000);
+     Assert.assertTrue(initialResult);
+ 
+     // Untag a node
+     helixAdmin.removeInstanceTag(clusterName, "localhost_12918", TAG);
+ 
+     // Verify again
+     boolean finalResult = TestHelper.verify(v, 10 * 1000);
+     Assert.assertTrue(finalResult);
+ 
+     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+   }
+ 
+   /**
+    * Ensure that no assignments happen when there are no tagged nodes, but 
the resource is tagged
+    */
+   @Test
+   public void testResourceTaggedFirst() throws Exception {
+     final int NUM_PARTICIPANTS = 10;
+     final int NUM_PARTITIONS = 4;
+     final int NUM_REPLICAS = 2;
+     final String RESOURCE_NAME = "TestDB0";
+     final String TAG = "ASSIGNABLE";
+ 
+     String className = TestHelper.getTestClassName();
+     String methodName = TestHelper.getTestMethodName();
+     String clusterName = className + "_" + methodName;
+     System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+ 
+     // Set up cluster
+     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+         "localhost", // participant name prefix
+         "TestDB", // resource name prefix
+         1, // resources
+         NUM_PARTITIONS, // partitions per resource
+         NUM_PARTICIPANTS, // number of nodes
+         NUM_REPLICAS, // replicas
+         "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test 
node tagging
+         true); // do rebalance
+ 
+     // tag the resource
+     HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
+     IdealState idealState = helixAdmin.getResourceIdealState(clusterName, 
RESOURCE_NAME);
+     idealState.setInstanceGroupTag(TAG);
+     helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ 
+     // start controller
+     ClusterControllerManager controller =
+         new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+     controller.syncStart();
+ 
+     // start participants
+     MockParticipantManager[] participants = new 
MockParticipantManager[NUM_PARTICIPANTS];
+     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+       final String instanceName = "localhost_" + (12918 + i);
+ 
+       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+       participants[i].syncStart();
+     }
+ 
+     Thread.sleep(1000);
+     boolean result =
+         ClusterStateVerifier.verifyByZkCallback(new 
EmptyZkVerifier(clusterName, RESOURCE_NAME));
+     Assert.assertTrue(result, "External view and current state must be 
empty");
+ 
+     // cleanup
+     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+       participants[i].syncStop();
+     }
+     controller.syncStop();
+   }
+ 
+   /**
+    * Basic test for tagging behavior. 10 participants, of which 4 are tagged. 
Launch all 10,
+    * checking external view every time a tagged node is started. Then shut 
down all 10, checking
+    * external view every time a tagged node is killed.
+    */
+   @Test
+   public void testSafeAssignment() throws Exception {
+     final int NUM_PARTICIPANTS = 10;
+     final int NUM_PARTITIONS = 4;
+     final int NUM_REPLICAS = 2;
+     final String RESOURCE_NAME = "TestDB0";
+     final String TAG = "ASSIGNABLE";
+ 
+     final String[] TAGGED_NODES = {
+         "localhost_12920", "localhost_12922", "localhost_12924", 
"localhost_12925"
+     };
+     Set<String> taggedNodes = Sets.newHashSet(TAGGED_NODES);
+ 
+     String className = TestHelper.getTestClassName();
+     String methodName = TestHelper.getTestMethodName();
+     String clusterName = className + "_" + methodName;
+ 
+     System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+ 
+     // Set up cluster
+     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+         "localhost", // participant name prefix
+         "TestDB", // resource name prefix
+         1, // resources
+         NUM_PARTITIONS, // partitions per resource
+         NUM_PARTICIPANTS, // number of nodes
+         NUM_REPLICAS, // replicas
+         "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test 
node tagging
+         true); // do rebalance
+ 
+     // tag the resource and participants
+     HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR);
+     for (String taggedNode : TAGGED_NODES) {
+       helixAdmin.addInstanceTag(clusterName, taggedNode, TAG);
+     }
+     IdealState idealState = helixAdmin.getResourceIdealState(clusterName, 
RESOURCE_NAME);
+     idealState.setInstanceGroupTag(TAG);
+     helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState);
+ 
+     // start controller
+     ClusterControllerManager controller =
+         new ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+     controller.syncStart();
+ 
+     // start participants
+     MockParticipantManager[] participants = new 
MockParticipantManager[NUM_PARTICIPANTS];
+     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+       final String instanceName = "localhost_" + (12918 + i);
+ 
+       participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+       participants[i].syncStart();
+ 
+       // ensure that everything is valid if this is a tagged node that is 
starting
+       if (taggedNodes.contains(instanceName)) {
+         // make sure that the best possible matches the external view
+         Thread.sleep(500);
+         boolean result =
+             ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
+                 clusterName));
+         Assert.assertTrue(result);
+ 
+         // make sure that the tagged state of the nodes is still balanced
+         result =
+             ClusterStateVerifier.verifyByZkCallback(new 
TaggedZkVerifier(clusterName,
+                 RESOURCE_NAME, TAGGED_NODES, false));
+         Assert.assertTrue(result, "initial assignment with all tagged nodes 
live is invalid");
+       }
+     }
+ 
+     // cleanup
+     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
+       String participantName = participants[i].getInstanceName();
+       participants[i].syncStop();
+       if (taggedNodes.contains(participantName)) {
+         // check that the external view is still correct even after removing 
tagged nodes
+         taggedNodes.remove(participantName);
+         Thread.sleep(500);
+         boolean result =
+             ClusterStateVerifier.verifyByZkCallback(new 
TaggedZkVerifier(clusterName,
+                 RESOURCE_NAME, TAGGED_NODES, taggedNodes.isEmpty()));
+         Assert.assertTrue(result, "incorrect state after removing " + 
participantName + ", "
+             + taggedNodes + " remain");
+       }
+     }
+     controller.syncStop();
+     System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+   }
+ 
+   /**
+    * Checker for basic validity of the external view given node tagging 
requirements
+    */
+   private static class TaggedZkVerifier implements ZkVerifier {
+     private final String _clusterName;
+     private final String _resourceName;
+     private final String[] _taggedNodes;
+     private final boolean _isEmptyAllowed;
+     private final ZkClient _zkClient;
+ 
+     /**
+      * Create a verifier for a specific cluster and resource
+      * @param clusterName the cluster to verify
+      * @param resourceName the resource within the cluster to verify
+      * @param taggedNodes nodes tagged with the resource tag
+      * @param isEmptyAllowed true if empty assignments are legal
+      */
+     public TaggedZkVerifier(String clusterName, String resourceName, String[] 
taggedNodes,
+         boolean isEmptyAllowed) {
+       _clusterName = clusterName;
+       _resourceName = resourceName;
+       _taggedNodes = taggedNodes;
+       _isEmptyAllowed = isEmptyAllowed;
+       _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+     }
+ 
+     @Override
+     public boolean verify() {
+       BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+       HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, 
baseAccessor);
+       PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+       ExternalView externalView = 
accessor.getProperty(keyBuilder.externalView(_resourceName));
+ 
+       Set<String> taggedNodeSet = ImmutableSet.copyOf(_taggedNodes);
+ 
+       // set up counts of partitions, masters, and slaves per node
+       Map<String, Integer> partitionCount = Maps.newHashMap();
+       int partitionSum = 0;
+       Map<String, Integer> masterCount = Maps.newHashMap();
+       int masterSum = 0;
+       Map<String, Integer> slaveCount = Maps.newHashMap();
+       int slaveSum = 0;
+ 
+       for (String partitionName : externalView.getPartitionSet()) {
+         Map<String, String> stateMap = 
externalView.getStateMap(partitionName);
+         for (String participantName : stateMap.keySet()) {
+           String state = stateMap.get(participantName);
+           if (state.equalsIgnoreCase("MASTER") || 
state.equalsIgnoreCase("SLAVE")) {
+             partitionSum++;
+             incrementCount(partitionCount, participantName);
+             if (!taggedNodeSet.contains(participantName)) {
+               // not allowed to have a non-tagged node assigned
+               LOG.error("Participant " + participantName + " is not tag, but 
has an assigned node");
+               return false;
+             } else if (state.equalsIgnoreCase("MASTER")) {
+               masterSum++;
+               incrementCount(masterCount, participantName);
+             } else if (state.equalsIgnoreCase("SLAVE")) {
+               slaveSum++;
+               incrementCount(slaveCount, participantName);
+             }
+           }
+         }
+       }
+ 
+       // check balance in partitions per node
+       if (partitionCount.size() > 0) {
+         boolean partitionMapDividesEvenly = partitionSum % 
partitionCount.size() == 0;
+         boolean withinAverage =
+             withinAverage(partitionCount, _isEmptyAllowed, 
partitionMapDividesEvenly);
+         if (!withinAverage) {
+           LOG.error("partition counts deviate from average");
+           return false;
+         }
+       } else {
+         if (!_isEmptyAllowed) {
+           LOG.error("partition assignments are empty");
+           return false;
+         }
+       }
+ 
+       // check balance in masters per node
+       if (masterCount.size() > 0) {
+         boolean masterMapDividesEvenly = masterSum % masterCount.size() == 0;
+         boolean withinAverage = withinAverage(masterCount, _isEmptyAllowed, 
masterMapDividesEvenly);
+         if (!withinAverage) {
+           LOG.error("master counts deviate from average");
+           return false;
+         }
+       } else {
+         if (!_isEmptyAllowed) {
+           LOG.error("master assignments are empty");
+           return false;
+         }
+       }
+ 
+       // check balance in slaves per node
+       if (slaveCount.size() > 0) {
+         boolean slaveMapDividesEvenly = slaveSum % slaveCount.size() == 0;
+         boolean withinAverage = withinAverage(slaveCount, true, 
slaveMapDividesEvenly);
+         if (!withinAverage) {
+           LOG.error("slave counts deviate from average");
+           return false;
+         }
+       }
+       return true;
+     }
+ 
+     private void incrementCount(Map<String, Integer> countMap, String key) {
+       if (!countMap.containsKey(key)) {
+         countMap.put(key, 0);
+       }
+       countMap.put(key, countMap.get(key) + 1);
+     }
+ 
+     private boolean withinAverage(Map<String, Integer> countMap, boolean 
isEmptyAllowed,
+         boolean dividesEvenly) {
+       if (countMap.size() == 0) {
+         if (!isEmptyAllowed) {
+           LOG.error("Map not allowed to be empty");
+           return false;
+         }
+         return true;
+       }
+       int upperBound = 1;
+       if (!dividesEvenly) {
+         upperBound = 2;
+       }
+       int average = computeAverage(countMap);
+       for (String participantName : countMap.keySet()) {
+         int count = countMap.get(participantName);
+         if (count < average - 1 || count > average + upperBound) {
+           LOG.error("Count " + count + " for " + participantName + " too far 
from average of "
+               + average);
+           return false;
+         }
+       }
+       return true;
+     }
+ 
+     private int computeAverage(Map<String, Integer> countMap) {
+       if (countMap.size() == 0) {
+         return -1;
+       }
+       int total = 0;
+       for (int value : countMap.values()) {
+         total += value;
+       }
+       return total / countMap.size();
+     }
+ 
+     @Override
+     public ZkClient getZkClient() {
+       return _zkClient;
+     }
+ 
+     @Override
+     public String getClusterName() {
+       return _clusterName;
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
index 7fb9746,0cf1ca3..53db79b
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestLiveInstanceBounce.java
@@@ -19,9 -19,9 +19,9 @@@ package org.apache.helix.manager.zk
   * under the License.
   */
  
- import org.apache.helix.integration.ZkStandAloneCMTestBase;
+ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
  import org.apache.helix.integration.manager.MockParticipantManager;
 -import org.apache.helix.tools.ClusterStateVerifier;
 +import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
  import org.testng.Assert;
  import org.testng.annotations.Test;
  

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --cc 
helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 735325f,29df0b7..6808292
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@@ -34,9 -34,10 +34,10 @@@ import org.apache.helix.integration.man
  import org.apache.helix.integration.manager.MockParticipantManager;
  import org.apache.helix.model.IdealState;
  import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
+ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
  import org.apache.helix.tools.ClusterSetup;
 -import org.apache.helix.tools.ClusterStateVerifier;
 -import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 +import org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier;
 +import 
org.apache.helix.tools.ClusterVerifiers.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
  import org.apache.log4j.Logger;
  import org.testng.Assert;
  import org.testng.annotations.AfterClass;

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResetClusterMetrics.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/helix/blob/fe0b6e80/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
----------------------------------------------------------------------

Reply via email to